diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index c115621591..7e738df9a6 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -561,6 +561,7 @@ def cleanup(self): # Send final stats self.usage_tracker.send_message() + self.usage_tracker.close() # We do not need to cleanup if the executors are managed outside # the DFK if not self._executors_managed: diff --git a/parsl/dataflow/usage_tracking/usage.py b/parsl/dataflow/usage_tracking/usage.py index 1e8b20c9c3..604882f114 100644 --- a/parsl/dataflow/usage_tracking/usage.py +++ b/parsl/dataflow/usage_tracking/usage.py @@ -7,6 +7,7 @@ import logging import socket import sys +import multiprocessing as mp from parsl.dataflow.states import States from parsl.version import VERSION as PARSL_VERSION @@ -14,6 +15,55 @@ logger = logging.getLogger(__name__) +def async_process(fn): + """ Decorator function to launch a function as a separate process """ + + def run(*args, **kwargs): + proc = mp.Process(target=fn, args=args, kwargs=kwargs) + proc.start() + return proc + + return run + + +@async_process +def udp_messenger(domain_name, UDP_IP, UDP_PORT, sock_timeout, message): + """Send UDP messages to usage tracker asynchronously + + This multiprocessing based messenger was written to overcome the limitations + of signalling/terminating a thread that is blocked on a system call. This + messenger is created as a separate process, and initialized with 2 queues, + to_send to receive messages to be sent to the internet, and done to let the + main thread know return status. + + Args: + - domain_name (str) : Domain name string + - UDP_IP (str) : IP address YYY.YYY.YYY.YYY + - UDP_PORT (int) : UDP port to send out on + - sock_timeout (int) : Socket timeout + - to_send (multiprocessing.Queue) : Queue of outgoing messages to internet + - done (multiprocessing.Queue) : Queue of status to be received by main process + """ + + if domain_name: + try: + UDP_IP = socket.gethostbyname(domain_name) + except Exception: + # (False, "Domain lookup failed, defaulting to {0}".format(UDP_IP)) + pass + + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP + sock.settimeout(sock_timeout) + sock.sendto(bytes(message, "utf-8"), (UDP_IP, UDP_PORT)) + sock.close() + + except socket.timeout: + done.put_nowait((False, "Sending usage-tracking info timed-out")) + except OSError as e: + done.put_nowait((False, "Unable to reach the network to send usage data {}".format(e))) + + class UsageTracker (object): """Anonymized Usage Tracking for Parsl. @@ -27,6 +77,11 @@ def __init__(self, dfk, ip='52.3.111.203', port=50077, domain_name='tracking.parsl-project.org'): """Initialize usage tracking unless the user has opted-out. + We will try to resolve the hostname specified in kwarg:domain_name + and if that fails attempt to use the kwarg:ip. Determining the + IP and sending message is threaded to avoid slowing down DFK + initialization. + Tracks usage stats by inspecting the internal state of the dfk. Args: @@ -38,15 +93,20 @@ def __init__(self, dfk, ip='52.3.111.203', port=50077, - domain_name (string) : Domain name, will override IP Default: tracking.parsl-project.org """ - if domain_name: - try: - self.UDP_IP = socket.gethostbyname(domain_name) - except Exception: - logging.debug("Could not lookup domain_name, defaulting to 52.3.111.203") - self.UDP_IP = ip - else: - self.UDP_IP = ip + + self.domain_name = domain_name + self.ip = ip + # The sock timeout will only apply to UDP send and not domain resolution + self.sock_timeout = 5 self.UDP_PORT = port + self.UDP_IP = None + + self._to_send = mp.Queue() + self._done = mp.Queue() + # self.proc = mp.Process(target=udp_messenger, args=(self.domain_name, self.UDP_IP, self.UDP_PORT, + # self.sock_timeout, self._to_send, self._done,)) + # self.proc.start() + self.procs = [] self.dfk = dfk self.config = self.dfk.config self.uuid = str(uuid.uuid4()) @@ -134,18 +194,22 @@ def construct_end_message(self): def send_UDP_message(self, message): """Send UDP message.""" + x = 0 if self.tracking_enabled: + """ + self._to_send.put_nowait([message]) + try: + msg = self._done.get(False, 1) + if not msg[0] : + logger.debug(msg[1]) + except queue.Empty: + pass + """ try: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP - sock.settimeout(1.0) - x = sock.sendto(bytes(message, "utf-8"), (self.UDP_IP, self.UDP_PORT)) - sock.close() - except socket.timeout: - logger.debug("Sending usage-tracking info timed-out") - x = 0 - except OSError: - logger.debug("Unable to reach the network to send usage data") - x = 0 + proc = udp_messenger(self.domain_name, self.UDP_IP, self.UDP_PORT, self.sock_timeout, message) + self.procs.append(proc) + except Exception as e: + logger.debug("Usage tracking failed: {}".format(e)) else: x = -1 @@ -167,22 +231,26 @@ def send_message(self): else: message = self.construct_end_message() - x = self.send_UDP_message(message) + self.send_UDP_message(message) end = time.time() - return x, end - start + return end - start + + def __del__(self): + return self.close() + + def close(self): + """We terminate (SIGTERM) the processes added to the self.procs list """ + for proc in self.procs: + proc.terminate() if __name__ == '__main__': from parsl import * + set_stream_logger() workers = ThreadPoolExecutor(max_workers=4) dfk = DataFlowKernel(executors=[workers]) - # ut = UsageTracker(dfk, ip='52.3.111.203') - ut = UsageTracker(dfk, domain_name='tracking.parsl-project.org') - - for i in range(0, 2): - x = ut.send_message() - print(x) + dfk.cleanup()