Skip to content

Commit

Permalink
Moving UDP messaging for usage_tracking to separate process. Fixed #220
Browse files Browse the repository at this point in the history
Added decorator to launch fn on a new process.
Cleaner isolated udp_messenger code.
Added cleanup code triggered from dfk at cleanup
  • Loading branch information
yadudoc committed May 11, 2018
1 parent ce7cdcb commit dfccb8a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 26 deletions.
1 change: 1 addition & 0 deletions parsl/dataflow/dflow.py
Expand Up @@ -561,6 +561,7 @@ def cleanup(self):

# Send final stats
self.usage_tracker.send_message()
self.usage_tracker.close()
# We do not need to cleanup if the executors are managed outside
# the DFK
if not self._executors_managed:
Expand Down
120 changes: 94 additions & 26 deletions parsl/dataflow/usage_tracking/usage.py
Expand Up @@ -7,13 +7,63 @@
import logging
import socket
import sys
import multiprocessing as mp

from parsl.dataflow.states import States
from parsl.version import VERSION as PARSL_VERSION

logger = logging.getLogger(__name__)


def async_process(fn):
""" Decorator function to launch a function as a separate process """

def run(*args, **kwargs):
proc = mp.Process(target=fn, args=args, kwargs=kwargs)
proc.start()
return proc

return run


@async_process
def udp_messenger(domain_name, UDP_IP, UDP_PORT, sock_timeout, message):
"""Send UDP messages to usage tracker asynchronously
This multiprocessing based messenger was written to overcome the limitations
of signalling/terminating a thread that is blocked on a system call. This
messenger is created as a separate process, and initialized with 2 queues,
to_send to receive messages to be sent to the internet, and done to let the
main thread know return status.
Args:
- domain_name (str) : Domain name string
- UDP_IP (str) : IP address YYY.YYY.YYY.YYY
- UDP_PORT (int) : UDP port to send out on
- sock_timeout (int) : Socket timeout
- to_send (multiprocessing.Queue) : Queue of outgoing messages to internet
- done (multiprocessing.Queue) : Queue of status to be received by main process
"""

if domain_name:
try:
UDP_IP = socket.gethostbyname(domain_name)
except Exception:
# (False, "Domain lookup failed, defaulting to {0}".format(UDP_IP))
pass

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.settimeout(sock_timeout)
sock.sendto(bytes(message, "utf-8"), (UDP_IP, UDP_PORT))
sock.close()

except socket.timeout:
done.put_nowait((False, "Sending usage-tracking info timed-out"))
except OSError as e:
done.put_nowait((False, "Unable to reach the network to send usage data {}".format(e)))


class UsageTracker (object):
"""Anonymized Usage Tracking for Parsl.
Expand All @@ -27,6 +77,11 @@ def __init__(self, dfk, ip='52.3.111.203', port=50077,
domain_name='tracking.parsl-project.org'):
"""Initialize usage tracking unless the user has opted-out.
We will try to resolve the hostname specified in kwarg:domain_name
and if that fails attempt to use the kwarg:ip. Determining the
IP and sending message is threaded to avoid slowing down DFK
initialization.
Tracks usage stats by inspecting the internal state of the dfk.
Args:
Expand All @@ -38,15 +93,20 @@ def __init__(self, dfk, ip='52.3.111.203', port=50077,
- domain_name (string) : Domain name, will override IP
Default: tracking.parsl-project.org
"""
if domain_name:
try:
self.UDP_IP = socket.gethostbyname(domain_name)
except Exception:
logging.debug("Could not lookup domain_name, defaulting to 52.3.111.203")
self.UDP_IP = ip
else:
self.UDP_IP = ip

self.domain_name = domain_name
self.ip = ip
# The sock timeout will only apply to UDP send and not domain resolution
self.sock_timeout = 5
self.UDP_PORT = port
self.UDP_IP = None

self._to_send = mp.Queue()
self._done = mp.Queue()
# self.proc = mp.Process(target=udp_messenger, args=(self.domain_name, self.UDP_IP, self.UDP_PORT,
# self.sock_timeout, self._to_send, self._done,))
# self.proc.start()
self.procs = []
self.dfk = dfk
self.config = self.dfk.config
self.uuid = str(uuid.uuid4())
Expand Down Expand Up @@ -134,18 +194,22 @@ def construct_end_message(self):

def send_UDP_message(self, message):
"""Send UDP message."""
x = 0
if self.tracking_enabled:
"""
self._to_send.put_nowait([message])
try:
msg = self._done.get(False, 1)
if not msg[0] :
logger.debug(msg[1])
except queue.Empty:
pass
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.settimeout(1.0)
x = sock.sendto(bytes(message, "utf-8"), (self.UDP_IP, self.UDP_PORT))
sock.close()
except socket.timeout:
logger.debug("Sending usage-tracking info timed-out")
x = 0
except OSError:
logger.debug("Unable to reach the network to send usage data")
x = 0
proc = udp_messenger(self.domain_name, self.UDP_IP, self.UDP_PORT, self.sock_timeout, message)
self.procs.append(proc)
except Exception as e:
logger.debug("Usage tracking failed: {}".format(e))
else:
x = -1

Expand All @@ -167,22 +231,26 @@ def send_message(self):
else:
message = self.construct_end_message()

x = self.send_UDP_message(message)
self.send_UDP_message(message)
end = time.time()

return x, end - start
return end - start

def __del__(self):
return self.close()

def close(self):
"""We terminate (SIGTERM) the processes added to the self.procs list """
for proc in self.procs:
proc.terminate()


if __name__ == '__main__':

from parsl import *

set_stream_logger()
workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

# ut = UsageTracker(dfk, ip='52.3.111.203')
ut = UsageTracker(dfk, domain_name='tracking.parsl-project.org')

for i in range(0, 2):
x = ut.send_message()
print(x)
dfk.cleanup()

0 comments on commit dfccb8a

Please sign in to comment.