Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report usage using a richer configuration traversing protocol #3229

Merged
merged 9 commits into from Apr 25, 2024
6 changes: 5 additions & 1 deletion parsl/config.py
Expand Up @@ -10,11 +10,12 @@
from parsl.errors import ConfigurationError
from parsl.dataflow.taskrecord import TaskRecord
from parsl.monitoring import MonitoringHub
from parsl.usage_tracking.api import UsageInformation

logger = logging.getLogger(__name__)


class Config(RepresentationMixin):
class Config(RepresentationMixin, UsageInformation):
"""
Specification of Parsl configuration options.

Expand Down Expand Up @@ -144,3 +145,6 @@ def _validate_executors(self) -> None:
if len(duplicates) > 0:
raise ConfigurationError('Executors must have unique labels ({})'.format(
', '.join(['label={}'.format(repr(d)) for d in duplicates])))

def get_usage_information(self):
return {"executors_len": len(self.executors)}
6 changes: 5 additions & 1 deletion parsl/executors/high_throughput/executor.py
Expand Up @@ -14,6 +14,7 @@
import warnings

import parsl.launchers
from parsl.usage_tracking.api import UsageInformation
from parsl.serialize import pack_res_spec_apply_message, deserialize
from parsl.serialize.errors import SerializationError, DeserializationError
from parsl.app.errors import RemoteExceptionWrapper
Expand Down Expand Up @@ -62,7 +63,7 @@
"--available-accelerators {accelerators}")


class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
"""Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
Expand Down Expand Up @@ -819,3 +820,6 @@ def shutdown(self, timeout: float = 10.0):
self.interchange_proc.kill()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
return {"mpi": self.enable_mpi_mode}
66 changes: 66 additions & 0 deletions parsl/usage_tracking/api.py
@@ -0,0 +1,66 @@
from parsl.utils import RepresentationMixin

from abc import abstractmethod
from functools import singledispatch
from typing import Any, List, Sequence


# Traverse the configuration heirarchy, returning a JSON component
# for each one. Configuration components which implement
benclifford marked this conversation as resolved.
Show resolved Hide resolved
# RepresentationMixin will be in the right form for inspecting
# object attributes. Configuration components which are lists or tuples
# are traversed in sequence. Other types default to reporting no
# usage information.

@singledispatch
def get_parsl_usage(obj) -> List[Any]:
return []


@get_parsl_usage.register
def get_parsl_usage_representation_mixin(obj: RepresentationMixin):
t = type(obj)
qualified_name = t.__module__ + "." + t.__name__

# me can contain anything that can be rendered as JSON
me: List[Any] = []

if isinstance(obj, UsageInformation):
# report rich usage information for this component
attrs = {'c': qualified_name}
attrs.update(obj.get_usage_information())
me = [attrs]
else:
# report the class name of this component
me = [qualified_name]

# unwrap typeguard-style unwrapping
init: Any = type(obj).__init__
if hasattr(init, '__wrapped__'):
init = init.__wrapped__

import inspect

benclifford marked this conversation as resolved.
Show resolved Hide resolved
argspec = inspect.getfullargspec(init)

for arg in argspec.args[1:]: # skip first arg, self
arg_value = getattr(obj, arg)
d = get_parsl_usage(arg_value)
me += d
Comment on lines +46 to +49
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistic alternative (but by no means a blocker):

    me.extend(get_parsl_usage(getattr(obj, arg)) for arg in argspec.args[1:])


return me


@get_parsl_usage.register(list)
@get_parsl_usage.register(tuple)
def get_parsl_usage_sequence(obj: Sequence):
benclifford marked this conversation as resolved.
Show resolved Hide resolved
result = []
for v in obj:
result += get_parsl_usage(v)
return result
benclifford marked this conversation as resolved.
Show resolved Hide resolved


class UsageInformation:
@abstractmethod
def get_usage_information(self) -> dict:
pass
60 changes: 35 additions & 25 deletions parsl/usage_tracking/usage.py
Expand Up @@ -7,6 +7,7 @@
import sys
import platform

from parsl.usage_tracking.api import get_parsl_usage
from parsl.utils import setproctitle
from parsl.multiprocessing import ForkProcess
from parsl.dataflow.states import States
Expand All @@ -17,6 +18,13 @@
from typing import Callable
from typing_extensions import ParamSpec

# protocol version byte: when (for example) compression parameters are changed
# that cannot be inferred from the compressed message itself, this version
# ID needs to imply those parameters.

# Earlier protocol versions: b'{' - the original pure-JSON protocol pre-March 2024
PROTOCOL_VERSION = b'1'

P = ParamSpec("P")


Expand All @@ -32,7 +40,7 @@ def run(*args, **kwargs):


@async_process
def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: str) -> None:
def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: bytes) -> None:
"""Send UDP messages to usage tracker asynchronously

This multiprocessing based messenger was written to overcome the limitations
Expand All @@ -46,16 +54,11 @@ def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: s
setproctitle("parsl: Usage tracking")

try:
encoded_message = bytes(message, "utf-8")

UDP_IP = socket.gethostbyname(domain_name)

if UDP_PORT is None:
raise Exception("UDP_PORT is None")

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.settimeout(sock_timeout)
sock.sendto(encoded_message, (UDP_IP, UDP_PORT))
sock.sendto(message, (UDP_IP, UDP_PORT))
sock.close()

except socket.timeout:
Expand Down Expand Up @@ -102,7 +105,7 @@ def __init__(self, dfk, port=50077,
self.procs = []
self.dfk = dfk
self.config = self.dfk.config
self.uuid = str(uuid.uuid4())
self.correlator_uuid = str(uuid.uuid4())
self.parsl_version = PARSL_VERSION
self.python_version = "{}.{}.{}".format(sys.version_info.major,
sys.version_info.minor,
Expand Down Expand Up @@ -130,43 +133,50 @@ def check_tracking_enabled(self):

return track

def construct_start_message(self) -> str:
def construct_start_message(self) -> bytes:
"""Collect preliminary run info at the start of the DFK.

Returns :
- Message dict dumped as json string, ready for UDP
"""
message = {'uuid': self.uuid,
message = {'correlator': self.correlator_uuid,
'parsl_v': self.parsl_version,
'python_v': self.python_version,
'os': platform.system(),
'os_v': platform.release(),
'start': time.time()}
'platform.system': platform.system(),
'start': int(time.time()),
'components': get_parsl_usage(self.dfk._config)}
logger.debug(f"Usage tracking start message: {message}")

return json.dumps(message)
return self.encode_message(message)

def construct_end_message(self) -> str:
def construct_end_message(self) -> bytes:
"""Collect the final run information at the time of DFK cleanup.

Returns:
- Message dict dumped as json string, ready for UDP
"""
app_count = self.dfk.task_count

site_count = len(self.dfk.config.executors)

app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail]

message = {'uuid': self.uuid,
'end': time.time(),
't_apps': app_count,
'sites': site_count,
'failed': app_fails
}
# the DFK is tangled into this code as a god-object, so it is
# handled separately from the usual traversal code, but presenting
# the same protocol-level report.
dfk_component = {'c': type(self.dfk).__module__ + "." + type(self.dfk).__name__,
'app_count': app_count,
'app_fails': app_fails}

message = {'correlator': self.correlator_uuid,
'end': int(time.time()),
'components': [dfk_component] + get_parsl_usage(self.dfk._config)}
logger.debug(f"Usage tracking end message (unencoded): {message}")

return self.encode_message(message)

return json.dumps(message)
def encode_message(self, obj):
return PROTOCOL_VERSION + json.dumps(obj).encode('utf-8')

benclifford marked this conversation as resolved.
Show resolved Hide resolved
def send_UDP_message(self, message: str) -> None:
def send_UDP_message(self, message: bytes) -> None:
"""Send UDP message."""
if self.tracking_enabled:
try:
Expand Down