Skip to content

Commit

Permalink
support mac
Browse files Browse the repository at this point in the history
  • Loading branch information
bghimire committed Sep 4, 2013
1 parent 036f439 commit e013084
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions python/hpx/process.py
Expand Up @@ -10,22 +10,32 @@
# TODO: Match threading.Thread interface and/or subprocess interface better?
# TODO: Better exception propagation

from sys import float_info
from sys import float_info, platform
from threading import Thread, Lock
from time import sleep, time
from subprocess import Popen, STDOUT, PIPE
from types import StringType
from shlex import split
from signal import SIGKILL
from os import kill, waitpid, WNOHANG
from select import epoll, EPOLLHUP
from platform import system
from Queue import Queue, Empty
from errno import ESRCH

import select
# TODO: implement for Windows

if "Linux" == system():
OS_MAC = False
OS_LIN = False

if platform.startswith('darwin'):
OS_MAC = True

if platform.startswith('linux'):
OS_LIN = True



if "Linux" == system():
def kill_process_tree(parent_pid, signal=SIGKILL):
cmd = "ps -o pid --ppid %d --noheaders" % parent_pid
ps_command = Popen(cmd, shell=True, stdout=PIPE)
Expand Down Expand Up @@ -169,18 +179,26 @@ class process_group(object):
def __init__(self, *cmds):
self._lock = Lock()
self._members = {}
self._poller = epoll()

if OS_MAC:
self._poller = select.kqueue()
if OS_LIN:
self._poller = select.epoll()

for cmd in cmds:
self.create_process(cmd)

def create_process(self, cmd):
return process(cmd, self)

def add_process(self, job):
with self._lock:
self._members[job.fileno()] = job
self._poller.register(job._proc.stdout, EPOLLHUP)
if OS_MAC:
self._poller.control([select.kevent(job._proc.stdout,
select.KQ_FILTER_READ, select.KQ_EV_ADD,
select.KQ_NOTE_LOWAT,0)],0)
if OS_LIN:
self._poller.register(job._proc.stdout, select.EPOLLHUP)

def join_all(self, timeout=None, callback=None):
with self._lock:
Expand All @@ -189,20 +207,36 @@ def join_all(self, timeout=None, callback=None):
started = time()

while timeout is None or not float_info.epsilon > timeout:
ready = self._poller.poll(timeout=-1.0 if timeout is None else timeout)

if not timeout is None:
timeout -= (time() - started)

for fd, flags in ready:
self._poller.unregister(fd)
not_done.pop(fd)
if OS_MAC:

if timeout == None:
timeout=-1.0

ready = self._poller.control(None,1,timeout)
if OS_LIN:
ready = self._poller.poll(timeout=-1.0 if timeout is None else timeout)

if not timeout is None:
timeout -= (time() - started)

if OS_MAC:
for fd in ready:
fd = fd.ident
self._poller.control([select.kevent(fd,select.KQ_FILTER_READ,
select.KQ_EV_DELETE)],0)
not_done.pop(fd)

if OS_LIN:
for fd, flags in ready:
self._poller.unregister(fd)
not_done.pop(fd)


if callable(callback):
callback(fd, self._members[fd])

if 0 == len(not_done):
return
if 0 == len(not_done):
return

# some of the jobs are not done, we'll have to forcefully stop them
for fd in not_done:
Expand Down Expand Up @@ -275,3 +309,7 @@ def read_callback(fd, job):

return output





0 comments on commit e013084

Please sign in to comment.