Skip to content

Commit

Permalink
Merge branch 'benc-usage-protocol' into update-usage-tracking
Browse files Browse the repository at this point in the history
Merge changes from Ben related to usage tracking update Parsl#3229
  • Loading branch information
NishchayKarle committed Apr 24, 2024
2 parents 7324e29 + 7253fc7 commit 39ac632
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 27 deletions.
6 changes: 5 additions & 1 deletion parsl/config.py
Expand Up @@ -10,11 +10,12 @@
from parsl.errors import ConfigurationError
from parsl.dataflow.taskrecord import TaskRecord
from parsl.monitoring import MonitoringHub
from parsl.usage_tracking.api import UsageInformation

logger = logging.getLogger(__name__)


class Config(RepresentationMixin):
class Config(RepresentationMixin, UsageInformation):
"""
Specification of Parsl configuration options.
Expand Down Expand Up @@ -144,3 +145,6 @@ def _validate_executors(self) -> None:
if len(duplicates) > 0:
raise ConfigurationError('Executors must have unique labels ({})'.format(
', '.join(['label={}'.format(repr(d)) for d in duplicates])))

def get_usage_information(self):
return {"executors_len": len(self.executors)}
6 changes: 5 additions & 1 deletion parsl/executors/high_throughput/executor.py
Expand Up @@ -14,6 +14,7 @@
import warnings

import parsl.launchers
from parsl.usage_tracking.api import UsageInformation
from parsl.serialize import pack_res_spec_apply_message, deserialize
from parsl.serialize.errors import SerializationError, DeserializationError
from parsl.app.errors import RemoteExceptionWrapper
Expand Down Expand Up @@ -62,7 +63,7 @@
"--available-accelerators {accelerators}")


class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
"""Executor designed for cluster-scale
The HighThroughputExecutor system has the following components:
Expand Down Expand Up @@ -819,3 +820,6 @@ def shutdown(self, timeout: float = 10.0):
self.interchange_proc.kill()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
return {"mpi": self.enable_mpi_mode}
66 changes: 66 additions & 0 deletions parsl/usage_tracking/api.py
@@ -0,0 +1,66 @@
from parsl.utils import RepresentationMixin

from abc import abstractmethod
from functools import singledispatch
from typing import Any, List, Sequence


# Traverse the configuration heirarchy, returning a JSON component
# for each one. Configuration components which implement
# RepresentationMixin will be in the right form for inspecting
# object attributes. Configuration components which are lists or tuples
# are traversed in sequence. Other types default to reporting no
# usage information.

@singledispatch
def get_parsl_usage(obj) -> List[Any]:
return []


@get_parsl_usage.register
def get_parsl_usage_representation_mixin(obj: RepresentationMixin):
t = type(obj)
qualified_name = t.__module__ + "." + t.__name__

# me can contain anything that can be rendered as JSON
me: List[Any] = []

if isinstance(obj, UsageInformation):
# report rich usage information for this component
attrs = {'c': qualified_name}
attrs.update(obj.get_usage_information())
me = [attrs]
else:
# report the class name of this component
me = [qualified_name]

# unwrap typeguard-style unwrapping
init: Any = type(obj).__init__
if hasattr(init, '__wrapped__'):
init = init.__wrapped__

import inspect

argspec = inspect.getfullargspec(init)

for arg in argspec.args[1:]: # skip first arg, self
arg_value = getattr(obj, arg)
d = get_parsl_usage(arg_value)
me += d

return me


@get_parsl_usage.register(list)
@get_parsl_usage.register(tuple)
def get_parsl_usage_sequence(obj: Sequence):
result = []
for v in obj:
result += get_parsl_usage(v)
return result


class UsageInformation:
@abstractmethod
def get_usage_information(self) -> dict:
pass
60 changes: 35 additions & 25 deletions parsl/usage_tracking/usage.py
Expand Up @@ -7,6 +7,7 @@
import sys
import platform

from parsl.usage_tracking.api import get_parsl_usage
from parsl.utils import setproctitle
from parsl.multiprocessing import ForkProcess
from parsl.dataflow.states import States
Expand All @@ -17,6 +18,13 @@
from typing import Callable
from typing_extensions import ParamSpec

# protocol version byte: when (for example) compression parameters are changed
# that cannot be inferred from the compressed message itself, this version
# ID needs to imply those parameters.

# Earlier protocol versions: b'{' - the original pure-JSON protocol pre-March 2024
PROTOCOL_VERSION = b'1'

P = ParamSpec("P")


Expand All @@ -32,7 +40,7 @@ def run(*args, **kwargs):


@async_process
def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: str) -> None:
def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: bytes) -> None:
"""Send UDP messages to usage tracker asynchronously
This multiprocessing based messenger was written to overcome the limitations
Expand All @@ -46,16 +54,11 @@ def udp_messenger(domain_name: str, UDP_PORT: int, sock_timeout: int, message: s
setproctitle("parsl: Usage tracking")

try:
encoded_message = bytes(message, "utf-8")

UDP_IP = socket.gethostbyname(domain_name)

if UDP_PORT is None:
raise Exception("UDP_PORT is None")

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.settimeout(sock_timeout)
sock.sendto(encoded_message, (UDP_IP, UDP_PORT))
sock.sendto(message, (UDP_IP, UDP_PORT))
sock.close()

except socket.timeout:
Expand Down Expand Up @@ -102,7 +105,7 @@ def __init__(self, dfk, port=50077,
self.procs = []
self.dfk = dfk
self.config = self.dfk.config
self.uuid = str(uuid.uuid4())
self.correlator_uuid = str(uuid.uuid4())
self.parsl_version = PARSL_VERSION
self.python_version = "{}.{}.{}".format(sys.version_info.major,
sys.version_info.minor,
Expand Down Expand Up @@ -130,43 +133,50 @@ def check_tracking_enabled(self):

return track

def construct_start_message(self) -> str:
def construct_start_message(self) -> bytes:
"""Collect preliminary run info at the start of the DFK.
Returns :
- Message dict dumped as json string, ready for UDP
"""
message = {'uuid': self.uuid,
message = {'correlator': self.correlator_uuid,
'parsl_v': self.parsl_version,
'python_v': self.python_version,
'os': platform.system(),
'os_v': platform.release(),
'start': time.time()}
'platform.system': platform.system(),
'start': int(time.time()),
'components': get_parsl_usage(self.dfk._config)}
logger.debug(f"Usage tracking start message: {message}")

return json.dumps(message)
return self.encode_message(message)

def construct_end_message(self) -> str:
def construct_end_message(self) -> bytes:
"""Collect the final run information at the time of DFK cleanup.
Returns:
- Message dict dumped as json string, ready for UDP
"""
app_count = self.dfk.task_count

site_count = len(self.dfk.config.executors)

app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail]

message = {'uuid': self.uuid,
'end': time.time(),
't_apps': app_count,
'sites': site_count,
'failed': app_fails
}
# the DFK is tangled into this code as a god-object, so it is
# handled separately from the usual traversal code, but presenting
# the same protocol-level report.
dfk_component = {'c': type(self.dfk).__module__ + "." + type(self.dfk).__name__,
'app_count': app_count,
'app_fails': app_fails}

message = {'correlator': self.correlator_uuid,
'end': int(time.time()),
'components': [dfk_component] + get_parsl_usage(self.dfk._config)}
logger.debug(f"Usage tracking end message (unencoded): {message}")

return self.encode_message(message)

return json.dumps(message)
def encode_message(self, obj):
return PROTOCOL_VERSION + json.dumps(obj).encode('utf-8')

def send_UDP_message(self, message: str) -> None:
def send_UDP_message(self, message: bytes) -> None:
"""Send UDP message."""
if self.tracking_enabled:
try:
Expand Down

0 comments on commit 39ac632

Please sign in to comment.