Skip to content
Browse files

Complete fork of multiprocessing 2.7 with no-execv patch and Celery pool

  • Loading branch information...
1 parent 7cea348 commit a5846b13412cddbf6b5a131923208acc2cea561a @ask committed
Sorry, we could not display the entire diff because it was too big.
View
80 CHANGES.txt
@@ -0,0 +1,80 @@
+2.6.2.1 -- 2009-07-30
+---------------------
+
+ * Issues #5155, 5313, 5331: multiprocessing.Process._bootstrap was
+ unconditionally calling "os.close(sys.stdin.fileno())" resulting in file
+ descriptor errors
+
+ * Issue #5400: Added patch for multiprocessing on netbsd compilation/support
+
+ * Fix and properly document the multiprocessing module's logging
+ support, expose the internal levels and provide proper usage
+ examples.
+
+ * Issue #5261: Patch multiprocessing's semaphore.c to support context
+ manager use: "with multiprocessing.Lock()" works now.
+
+ * Issue #3321: _multiprocessing.Connection() doesn't check handle; added checks
+ for *nix machines for negative handles and large int handles. Without this
+ check it is possible to segfault the interpreter.
+
+ * Issue #4301: Patch the logging module to add processName support, remove
+ _check_logger_class from multiprocessing.
+
+
+2.6.1.1 -- 2009-02-07
+---------------------
+
+ * Fixed an issue with `make doc`
+
+ * mp docs - fix issues 4012,3518,4193 (Python svn: r67419)
+
+ * issue4238: bsd support for cpu_count (Python svn: r67423)
+
+ * Move definition int sval into branch of ifdef where it is used.
+ Otherwise, you get a warning about an undefined variable.
+ (Python svn: r67440)
+
+ * Fixed a segfault in connection_recvbytes_into() which occured
+ with Python debug builds on 64bit Linux.
+
+ * Added reference to `Issue 1683 http://bugs.python.org/issue1683`_.
+
+ * issue 4301: patch logging to add processName, remove the old
+ _check_logger_class code (Python svn: r68737)
+
+ * Resolve issue 3321: (segfault) _multiprocessing.Connection()
+ doesn't check handle (Python svn: r68768)
+
+ * Documentation update
+
+
+2.6.0.2 -- 2008-11-27
+---------------------
+
+The release is based on 2.6.0+ and contains additional fixes
+from Python svn.
+
+ * Issue #5: Added monkey patch to make the threading module forward
+ compatible with Python 2.6 and 3.0.
+
+ * Python Issue #4204: Fixed a compilation issue on FreeBSD 4.
+
+ * Removed ``install`` target from Makefile.
+
+ * Updated comments of Modules/mmapmodules.c. The modifications
+ and origin are clearly marked now.
+
+ * Added sphinx builder for docs and new make target ``docs``.
+
+ * Changed version schema to Python.version.number.internal_revision
+
+ * Pulled doc fixes from Python svn: r67189, r67330, r67332
+
+
+2.6.0-0.1 -- 2008-10-27
+-----------------------
+
+The release is based on Python 2.6.0.
+
+ * Initial release
View
187 Doc/conf.py
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+#
+# multiprocessing documentation build configuration file, created by
+# sphinx-quickstart on Wed Nov 26 12:47:00 2008.
+#
+# This file is execfile()d with the current directory set to its containing dir.
+#
+# The contents of this file are pickled, so don't put values in the namespace
+# that aren't pickleable (module imports are okay, they're removed automatically).
+#
+# All configuration values have a default; values that are commented out
+# serve to show the default.
+
+import sys, os
+
+# If your extensions are in another directory, add it here. If the directory
+# is relative to the documentation root, use os.path.abspath to make it
+# absolute, like shown here.
+#sys.path.append(os.path.abspath('.'))
+
+# General configuration
+# ---------------------
+
+# Add any Sphinx extension module names here, as strings. They can be extensions
+# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
+extensions = ['sphinx.ext.autodoc']
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['templates']
+
+# The suffix of source filenames.
+source_suffix = '.rst'
+
+# The encoding of source files.
+#source_encoding = 'utf-8'
+
+# The master toctree document.
+master_doc = 'index'
+
+# General information about the project.
+project = u'multiprocessing'
+copyright = u'2008, Python Software Foundation'
+
+# The version info for the project you're documenting, acts as replacement for
+# |version| and |release|, also used in various other places throughout the
+# built documents.
+#
+# The short X.Y version.
+version = '2.6.0.2'
+# The full version, including alpha/beta/rc tags.
+release = '2.6.0.2'
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#language = None
+
+# There are two options for replacing |today|: either, you set today to some
+# non-false value, then it is used:
+#today = ''
+# Else, today_fmt is used as the format for a strftime call.
+#today_fmt = '%B %d, %Y'
+
+# List of documents that shouldn't be included in the build.
+#unused_docs = []
+
+# List of directories, relative to source directory, that shouldn't be searched
+# for source files.
+exclude_trees = ['build']
+
+# The reST default role (used for this markup: `text`) to use for all documents.
+#default_role = None
+
+# If true, '()' will be appended to :func: etc. cross-reference text.
+#add_function_parentheses = True
+
+# If true, the current module name will be prepended to all description
+# unit titles (such as .. function::).
+#add_module_names = True
+
+# If true, sectionauthor and moduleauthor directives will be shown in the
+# output. They are ignored by default.
+#show_authors = False
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = 'sphinx'
+
+
+# Options for HTML output
+# -----------------------
+
+# The style sheet to use for HTML and HTML Help pages. A file of that name
+# must exist either in Sphinx' static/ path, or in one of the custom paths
+# given in html_static_path.
+html_style = 'default.css'
+
+# The name for this set of Sphinx documents. If None, it defaults to
+# "<project> v<release> documentation".
+#html_title = None
+
+# A shorter title for the navigation bar. Default is the same as html_title.
+#html_short_title = None
+
+# The name of an image file (relative to this directory) to place at the top
+# of the sidebar.
+#html_logo = None
+
+# The name of an image file (within the static path) to use as favicon of the
+# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
+# pixels large.
+#html_favicon = None
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['static']
+
+# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
+# using the given strftime format.
+#html_last_updated_fmt = '%b %d, %Y'
+
+# If true, SmartyPants will be used to convert quotes and dashes to
+# typographically correct entities.
+#html_use_smartypants = True
+
+# Custom sidebar templates, maps document names to template names.
+#html_sidebars = {}
+
+# Additional templates that should be rendered to pages, maps page names to
+# template names.
+#html_additional_pages = {}
+
+# If false, no module index is generated.
+#html_use_modindex = True
+
+# If false, no index is generated.
+#html_use_index = True
+
+# If true, the index is split into individual pages for each letter.
+#html_split_index = False
+
+# If true, the reST sources are included in the HTML build as _sources/<name>.
+#html_copy_source = True
+
+# If true, an OpenSearch description file will be output, and all pages will
+# contain a <link> tag referring to it. The value of this option must be the
+# base URL from which the finished HTML is served.
+#html_use_opensearch = ''
+
+# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
+#html_file_suffix = ''
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'multiprocessingdoc'
+
+
+# Options for LaTeX output
+# ------------------------
+
+# The paper size ('letter' or 'a4').
+#latex_paper_size = 'letter'
+
+# The font size ('10pt', '11pt' or '12pt').
+#latex_font_size = '10pt'
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title, author, document class [howto/manual]).
+latex_documents = [
+ ('index', 'multiprocessing.tex', ur'multiprocessing Documentation',
+ ur'Python Software Foundation', 'manual'),
+]
+
+# The name of an image file (relative to this directory) to place at the top of
+# the title page.
+#latex_logo = None
+
+# For "manual" documents, if this is true, then toplevel headings are parts,
+# not chapters.
+#latex_use_parts = False
+
+# Additional stuff for the LaTeX preamble.
+#latex_preamble = ''
+
+# Documents to append as an appendix to all manuals.
+#latex_appendices = []
+
+# If false, no module index is generated.
+#latex_use_modindex = True
View
40 Doc/glossary.rst
@@ -0,0 +1,40 @@
+.. _glossary:
+
+********
+Glossary
+********
+
+.. glossary::
+
+ bytecode
+ Python source code is compiled into bytecode, the internal representation
+ of a Python program in the interpreter. The bytecode is also cached in
+ ``.pyc`` and ``.pyo`` files so that executing the same file is faster the
+ second time (recompilation from source to bytecode can be avoided). This
+ "intermediate language" is said to run on a :term:`virtual machine`
+ that executes the machine code corresponding to each bytecode.
+
+ CPython
+ The canonical implementation of the Python programming language. The
+ term "CPython" is used in contexts when necessary to distinguish this
+ implementation from others such as Jython or IronPython.
+
+ GIL
+ See :term:`global interpreter lock`.
+
+ global interpreter lock
+ The lock used by Python threads to assure that only one thread
+ executes in the :term:`CPython` :term:`virtual machine` at a time.
+ This simplifies the CPython implementation by assuring that no two
+ processes can access the same memory at the same time. Locking the
+ entire interpreter makes it easier for the interpreter to be
+ multi-threaded, at the expense of much of the parallelism afforded by
+ multi-processor machines. Efforts have been made in the past to
+ create a "free-threaded" interpreter (one which locks shared data at a
+ much finer granularity), but so far none have been successful because
+ performance suffered in the common single-processor case.
+
+ virtual machine
+ A computer defined entirely in software. Python's virtual machine
+ executes the :term:`bytecode` emitted by the bytecode compiler.
+
View
2 Doc/includes/__init__.py
@@ -0,0 +1,2 @@
+# package
+
View
238 Doc/includes/mp_benchmarks.py
@@ -0,0 +1,238 @@
+#
+# Simple benchmarks for the multiprocessing package
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+import time, sys, multiprocessing, threading, Queue, gc
+
+if sys.platform == 'win32':
+ _timer = time.clock
+else:
+ _timer = time.time
+
+delta = 1
+
+
+#### TEST_QUEUESPEED
+
+def queuespeed_func(q, c, iterations):
+ a = '0' * 256
+ c.acquire()
+ c.notify()
+ c.release()
+
+ for i in xrange(iterations):
+ q.put(a)
+
+ q.put('STOP')
+
+def test_queuespeed(Process, q, c):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ p = Process(target=queuespeed_func, args=(q, c, iterations))
+ c.acquire()
+ p.start()
+ c.wait()
+ c.release()
+
+ result = None
+ t = _timer()
+
+ while result != 'STOP':
+ result = q.get()
+
+ elapsed = _timer() - t
+
+ p.join()
+
+ print iterations, 'objects passed through the queue in', elapsed, 'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_PIPESPEED
+
+def pipe_func(c, cond, iterations):
+ a = '0' * 256
+ cond.acquire()
+ cond.notify()
+ cond.release()
+
+ for i in xrange(iterations):
+ c.send(a)
+
+ c.send('STOP')
+
+def test_pipespeed():
+ c, d = multiprocessing.Pipe()
+ cond = multiprocessing.Condition()
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ p = multiprocessing.Process(target=pipe_func,
+ args=(d, cond, iterations))
+ cond.acquire()
+ p.start()
+ cond.wait()
+ cond.release()
+
+ result = None
+ t = _timer()
+
+ while result != 'STOP':
+ result = c.recv()
+
+ elapsed = _timer() - t
+ p.join()
+
+ print iterations, 'objects passed through connection in',elapsed,'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_SEQSPEED
+
+def test_seqspeed(seq):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ t = _timer()
+
+ for i in xrange(iterations):
+ a = seq[5]
+
+ elapsed = _timer()-t
+
+ print iterations, 'iterations in', elapsed, 'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_LOCK
+
+def test_lockspeed(l):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ t = _timer()
+
+ for i in xrange(iterations):
+ l.acquire()
+ l.release()
+
+ elapsed = _timer()-t
+
+ print iterations, 'iterations in', elapsed, 'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_CONDITION
+
+def conditionspeed_func(c, N):
+ c.acquire()
+ c.notify()
+
+ for i in xrange(N):
+ c.wait()
+ c.notify()
+
+ c.release()
+
+def test_conditionspeed(Process, c):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ c.acquire()
+ p = Process(target=conditionspeed_func, args=(c, iterations))
+ p.start()
+
+ c.wait()
+
+ t = _timer()
+
+ for i in xrange(iterations):
+ c.notify()
+ c.wait()
+
+ elapsed = _timer()-t
+
+ c.release()
+ p.join()
+
+ print iterations * 2, 'waits in', elapsed, 'seconds'
+ print 'average number/sec:', iterations * 2 / elapsed
+
+####
+
+def test():
+ manager = multiprocessing.Manager()
+
+ gc.disable()
+
+ print '\n\t######## testing Queue.Queue\n'
+ test_queuespeed(threading.Thread, Queue.Queue(),
+ threading.Condition())
+ print '\n\t######## testing multiprocessing.Queue\n'
+ test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
+ multiprocessing.Condition())
+ print '\n\t######## testing Queue managed by server process\n'
+ test_queuespeed(multiprocessing.Process, manager.Queue(),
+ manager.Condition())
+ print '\n\t######## testing multiprocessing.Pipe\n'
+ test_pipespeed()
+
+ print
+
+ print '\n\t######## testing list\n'
+ test_seqspeed(range(10))
+ print '\n\t######## testing list managed by server process\n'
+ test_seqspeed(manager.list(range(10)))
+ print '\n\t######## testing Array("i", ..., lock=False)\n'
+ test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
+ print '\n\t######## testing Array("i", ..., lock=True)\n'
+ test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
+
+ print
+
+ print '\n\t######## testing threading.Lock\n'
+ test_lockspeed(threading.Lock())
+ print '\n\t######## testing threading.RLock\n'
+ test_lockspeed(threading.RLock())
+ print '\n\t######## testing multiprocessing.Lock\n'
+ test_lockspeed(multiprocessing.Lock())
+ print '\n\t######## testing multiprocessing.RLock\n'
+ test_lockspeed(multiprocessing.RLock())
+ print '\n\t######## testing lock managed by server process\n'
+ test_lockspeed(manager.Lock())
+ print '\n\t######## testing rlock managed by server process\n'
+ test_lockspeed(manager.RLock())
+
+ print
+
+ print '\n\t######## testing threading.Condition\n'
+ test_conditionspeed(threading.Thread, threading.Condition())
+ print '\n\t######## testing multiprocessing.Condition\n'
+ test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
+ print '\n\t######## testing condition managed by a server process\n'
+ test_conditionspeed(multiprocessing.Process, manager.Condition())
+
+ gc.enable()
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+ test()
View
364 Doc/includes/mp_distributing.py
@@ -0,0 +1,364 @@
+#
+# Module to allow spawning of processes on foreign host
+#
+# Depends on `multiprocessing` package -- tested with `processing-0.60`
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
+
+#
+# Imports
+#
+
+import sys
+import os
+import tarfile
+import shutil
+import subprocess
+import logging
+import itertools
+import Queue
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+from multiprocessing import Process, current_process, cpu_count
+from multiprocessing import util, managers, connection, forking, pool
+
+#
+# Logging
+#
+
+def get_logger():
+ return _logger
+
+_logger = logging.getLogger('distributing')
+_logger.propagate = 0
+
+_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
+_handler = logging.StreamHandler()
+_handler.setFormatter(_formatter)
+_logger.addHandler(_handler)
+
+info = _logger.info
+debug = _logger.debug
+
+#
+# Get number of cpus
+#
+
+try:
+ slot_count = cpu_count()
+except NotImplemented:
+ slot_count = 1
+
+#
+# Manager type which spawns subprocesses
+#
+
+class HostManager(managers.SyncManager):
+ '''
+ Manager type used for spawning processes on a (presumably) foreign host
+ '''
+ def __init__(self, address, authkey):
+ managers.SyncManager.__init__(self, address, authkey)
+ self._name = 'Host-unknown'
+
+ def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+ if hasattr(sys.modules['__main__'], '__file__'):
+ main_path = os.path.basename(sys.modules['__main__'].__file__)
+ else:
+ main_path = None
+ data = pickle.dumps((target, args, kwargs))
+ p = self._RemoteProcess(data, main_path)
+ if name is None:
+ temp = self._name.split('Host-')[-1] + '/Process-%s'
+ name = temp % ':'.join(map(str, p.get_identity()))
+ p.set_name(name)
+ return p
+
+ @classmethod
+ def from_address(cls, address, authkey):
+ manager = cls(address, authkey)
+ managers.transact(address, authkey, 'dummy')
+ manager._state.value = managers.State.STARTED
+ manager._name = 'Host-%s:%s' % manager.address
+ manager.shutdown = util.Finalize(
+ manager, HostManager._finalize_host,
+ args=(manager._address, manager._authkey, manager._name),
+ exitpriority=-10
+ )
+ return manager
+
+ @staticmethod
+ def _finalize_host(address, authkey, name):
+ managers.transact(address, authkey, 'shutdown')
+
+ def __repr__(self):
+ return '<Host(%s)>' % self._name
+
+#
+# Process subclass representing a process on (possibly) a remote machine
+#
+
+class RemoteProcess(Process):
+ '''
+ Represents a process started on a remote host
+ '''
+ def __init__(self, data, main_path):
+ assert not main_path or os.path.basename(main_path) == main_path
+ Process.__init__(self)
+ self._data = data
+ self._main_path = main_path
+
+ def _bootstrap(self):
+ forking.prepare({'main_path': self._main_path})
+ self._target, self._args, self._kwargs = pickle.loads(self._data)
+ return Process._bootstrap(self)
+
+ def get_identity(self):
+ return self._identity
+
+HostManager.register('_RemoteProcess', RemoteProcess)
+
+#
+# A Pool class that uses a cluster
+#
+
+class DistributedPool(pool.Pool):
+
+ def __init__(self, cluster, processes=None, initializer=None, initargs=()):
+ self._cluster = cluster
+ self.Process = cluster.Process
+ pool.Pool.__init__(self, processes or len(cluster),
+ initializer, initargs)
+
+ def _setup_queues(self):
+ self._inqueue = self._cluster._SettableQueue()
+ self._outqueue = self._cluster._SettableQueue()
+ self._quick_put = self._inqueue.put
+ self._quick_get = self._outqueue.get
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ inqueue.set_contents([None] * size)
+
+#
+# Manager type which starts host managers on other machines
+#
+
+def LocalProcess(**kwds):
+ p = Process(**kwds)
+ p.set_name('localhost/' + p.name)
+ return p
+
+class Cluster(managers.SyncManager):
+ '''
+ Represents collection of slots running on various hosts.
+
+ `Cluster` is a subclass of `SyncManager` so it allows creation of
+ various types of shared objects.
+ '''
+ def __init__(self, hostlist, modules):
+ managers.SyncManager.__init__(self, address=('localhost', 0))
+ self._hostlist = hostlist
+ self._modules = modules
+ if __name__ not in modules:
+ modules.append(__name__)
+ files = [sys.modules[name].__file__ for name in modules]
+ for i, file in enumerate(files):
+ if file.endswith('.pyc') or file.endswith('.pyo'):
+ files[i] = file[:-4] + '.py'
+ self._files = [os.path.abspath(file) for file in files]
+
+ def start(self):
+ managers.SyncManager.start(self)
+
+ l = connection.Listener(family='AF_INET', authkey=self._authkey)
+
+ for i, host in enumerate(self._hostlist):
+ host._start_manager(i, self._authkey, l.address, self._files)
+
+ for host in self._hostlist:
+ if host.hostname != 'localhost':
+ conn = l.accept()
+ i, address, cpus = conn.recv()
+ conn.close()
+ other_host = self._hostlist[i]
+ other_host.manager = HostManager.from_address(address,
+ self._authkey)
+ other_host.slots = other_host.slots or cpus
+ other_host.Process = other_host.manager.Process
+ else:
+ host.slots = host.slots or slot_count
+ host.Process = LocalProcess
+
+ self._slotlist = [
+ Slot(host) for host in self._hostlist for i in range(host.slots)
+ ]
+ self._slot_iterator = itertools.cycle(self._slotlist)
+ self._base_shutdown = self.shutdown
+ del self.shutdown
+
+ def shutdown(self):
+ for host in self._hostlist:
+ if host.hostname != 'localhost':
+ host.manager.shutdown()
+ self._base_shutdown()
+
+ def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+ slot = self._slot_iterator.next()
+ return slot.Process(
+ group=group, target=target, name=name, args=args, kwargs=kwargs
+ )
+
+ def Pool(self, processes=None, initializer=None, initargs=()):
+ return DistributedPool(self, processes, initializer, initargs)
+
+ def __getitem__(self, i):
+ return self._slotlist[i]
+
+ def __len__(self):
+ return len(self._slotlist)
+
+ def __iter__(self):
+ return iter(self._slotlist)
+
+#
+# Queue subclass used by distributed pool
+#
+
+class SettableQueue(Queue.Queue):
+ def empty(self):
+ return not self.queue
+ def full(self):
+ return self.maxsize > 0 and len(self.queue) == self.maxsize
+ def set_contents(self, contents):
+ # length of contents must be at least as large as the number of
+ # threads which have potentially called get()
+ self.not_empty.acquire()
+ try:
+ self.queue.clear()
+ self.queue.extend(contents)
+ self.not_empty.notifyAll()
+ finally:
+ self.not_empty.release()
+
+Cluster.register('_SettableQueue', SettableQueue)
+
+#
+# Class representing a notional cpu in the cluster
+#
+
+class Slot(object):
+ def __init__(self, host):
+ self.host = host
+ self.Process = host.Process
+
+#
+# Host
+#
+
+class Host(object):
+ '''
+ Represents a host to use as a node in a cluster.
+
+ `hostname` gives the name of the host. If hostname is not
+ "localhost" then ssh is used to log in to the host. To log in as
+ a different user use a host name of the form
+ "username@somewhere.org"
+
+ `slots` is used to specify the number of slots for processes on
+ the host. This affects how often processes will be allocated to
+ this host. Normally this should be equal to the number of cpus on
+ that host.
+ '''
+ def __init__(self, hostname, slots=None):
+ self.hostname = hostname
+ self.slots = slots
+
+ def _start_manager(self, index, authkey, address, files):
+ if self.hostname != 'localhost':
+ tempdir = copy_to_remote_temporary_directory(self.hostname, files)
+ debug('startup files copied to %s:%s', self.hostname, tempdir)
+ p = subprocess.Popen(
+ ['ssh', self.hostname, 'python', '-c',
+ '"import os; os.chdir(%r); '
+ 'from distributing import main; main()"' % tempdir],
+ stdin=subprocess.PIPE
+ )
+ data = dict(
+ name='BoostrappingHost', index=index,
+ dist_log_level=_logger.getEffectiveLevel(),
+ dir=tempdir, authkey=str(authkey), parent_address=address
+ )
+ pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
+ p.stdin.close()
+
+#
+# Copy files to remote directory, returning name of directory
+#
+
+unzip_code = '''"
+import tempfile, os, sys, tarfile
+tempdir = tempfile.mkdtemp(prefix='distrib-')
+os.chdir(tempdir)
+tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
+for ti in tf:
+ tf.extract(ti)
+print tempdir
+"'''
+
+def copy_to_remote_temporary_directory(host, files):
+ p = subprocess.Popen(
+ ['ssh', host, 'python', '-c', unzip_code],
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE
+ )
+ tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
+ for name in files:
+ tf.add(name, os.path.basename(name))
+ tf.close()
+ p.stdin.close()
+ return p.stdout.read().rstrip()
+
+#
+# Code which runs a host manager
+#
+
+def main():
+ # get data from parent over stdin
+ data = pickle.load(sys.stdin)
+ sys.stdin.close()
+
+ # set some stuff
+ _logger.setLevel(data['dist_log_level'])
+ forking.prepare(data)
+
+ # create server for a `HostManager` object
+ server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
+ current_process()._server = server
+
+ # report server address and number of cpus back to parent
+ conn = connection.Client(data['parent_address'], authkey=data['authkey'])
+ conn.send((data['index'], server.address, slot_count))
+ conn.close()
+
+ # set name etc
+ current_process().set_name('Host-%s:%s' % server.address)
+ util._run_after_forkers()
+
+ # register a cleanup function
+ def cleanup(directory):
+ debug('removing directory %s', directory)
+ shutil.rmtree(directory)
+ debug('shutting down host manager')
+ util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
+
+ # start host manager
+ debug('remote host manager starting in %s', data['dir'])
+ server.serve_forever()
View
101 Doc/includes/mp_newtype.py
@@ -0,0 +1,101 @@
+#
+# This module shows how to use arbitrary callables with a subclass of
+# `BaseManager`.
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+from multiprocessing import freeze_support
+from multiprocessing.managers import BaseManager, BaseProxy
+import operator
+
+##
+
+class Foo(object):
+ def f(self):
+ print 'you called Foo.f()'
+ def g(self):
+ print 'you called Foo.g()'
+ def _h(self):
+ print 'you called Foo._h()'
+
+# A simple generator function
+def baz():
+ for i in xrange(10):
+ yield i*i
+
+# Proxy type for generator objects
+class GeneratorProxy(BaseProxy):
+ _exposed_ = ('next', '__next__')
+ def __iter__(self):
+ return self
+ def next(self):
+ return self._callmethod('next')
+ def __next__(self):
+ return self._callmethod('__next__')
+
+# Function to return the operator module
+def get_operator_module():
+ return operator
+
+##
+
+class MyManager(BaseManager):
+ pass
+
+# register the Foo class; make `f()` and `g()` accessible via proxy
+MyManager.register('Foo1', Foo)
+
+# register the Foo class; make `g()` and `_h()` accessible via proxy
+MyManager.register('Foo2', Foo, exposed=('g', '_h'))
+
+# register the generator function baz; use `GeneratorProxy` to make proxies
+MyManager.register('baz', baz, proxytype=GeneratorProxy)
+
+# register get_operator_module(); make public functions accessible via proxy
+MyManager.register('operator', get_operator_module)
+
+##
+
+def test():
+ manager = MyManager()
+ manager.start()
+
+ print '-' * 20
+
+ f1 = manager.Foo1()
+ f1.f()
+ f1.g()
+ assert not hasattr(f1, '_h')
+ assert sorted(f1._exposed_) == sorted(['f', 'g'])
+
+ print '-' * 20
+
+ f2 = manager.Foo2()
+ f2.g()
+ f2._h()
+ assert not hasattr(f2, 'f')
+ assert sorted(f2._exposed_) == sorted(['g', '_h'])
+
+ print '-' * 20
+
+ it = manager.baz()
+ for i in it:
+ print '<%d>' % i,
+ print
+
+ print '-' * 20
+
+ op = manager.operator()
+ print 'op.add(23, 45) =', op.add(23, 45)
+ print 'op.pow(2, 94) =', op.pow(2, 94)
+ print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
+ print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
+ print 'op._exposed_ =', op._exposed_
+
+##
+
+if __name__ == '__main__':
+ freeze_support()
+ test()
View
314 Doc/includes/mp_pool.py
@@ -0,0 +1,314 @@
+#
+# A test of `multiprocessing.Pool` class
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+import multiprocessing
+import time
+import random
+import sys
+
+#
+# Functions used by test code
+#
+
+def calculate(func, args):
+ result = func(*args)
+ return '%s says that %s%s = %s' % (
+ multiprocessing.current_process().name,
+ func.__name__, args, result
+ )
+
+def calculatestar(args):
+ return calculate(*args)
+
+def mul(a, b):
+ time.sleep(0.5*random.random())
+ return a * b
+
+def plus(a, b):
+ time.sleep(0.5*random.random())
+ return a + b
+
+def f(x):
+ return 1.0 / (x-5.0)
+
+def pow3(x):
+ return x**3
+
+def noop(x):
+ pass
+
+#
+# Test code
+#
+
+def test():
+ print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
+
+ #
+ # Create pool
+ #
+
+ PROCESSES = 4
+ print 'Creating pool with %d processes\n' % PROCESSES
+ pool = multiprocessing.Pool(PROCESSES)
+ print 'pool = %s' % pool
+ print
+
+ #
+ # Tests
+ #
+
+ TASKS = [(mul, (i, 7)) for i in range(10)] + \
+ [(plus, (i, 8)) for i in range(10)]
+
+ results = [pool.apply_async(calculate, t) for t in TASKS]
+ imap_it = pool.imap(calculatestar, TASKS)
+ imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
+
+ print 'Ordered results using pool.apply_async():'
+ for r in results:
+ print '\t', r.get()
+ print
+
+ print 'Ordered results using pool.imap():'
+ for x in imap_it:
+ print '\t', x
+ print
+
+ print 'Unordered results using pool.imap_unordered():'
+ for x in imap_unordered_it:
+ print '\t', x
+ print
+
+ print 'Ordered results using pool.map() --- will block till complete:'
+ for x in pool.map(calculatestar, TASKS):
+ print '\t', x
+ print
+
+ #
+ # Simple benchmarks
+ #
+
+ N = 100000
+ print 'def pow3(x): return x**3'
+
+ t = time.time()
+ A = map(pow3, xrange(N))
+ print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
+ (N, time.time() - t)
+
+ t = time.time()
+ B = pool.map(pow3, xrange(N))
+ print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
+ (N, time.time() - t)
+
+ t = time.time()
+ C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
+ print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
+ ' seconds' % (N, N//8, time.time() - t)
+
+ assert A == B == C, (len(A), len(B), len(C))
+ print
+
+ L = [None] * 1000000
+ print 'def noop(x): pass'
+ print 'L = [None] * 1000000'
+
+ t = time.time()
+ A = map(noop, L)
+ print '\tmap(noop, L):\n\t\t%s seconds' % \
+ (time.time() - t)
+
+ t = time.time()
+ B = pool.map(noop, L)
+ print '\tpool.map(noop, L):\n\t\t%s seconds' % \
+ (time.time() - t)
+
+ t = time.time()
+ C = list(pool.imap(noop, L, chunksize=len(L)//8))
+ print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
+ (len(L)//8, time.time() - t)
+
+ assert A == B == C, (len(A), len(B), len(C))
+ print
+
+ del A, B, C, L
+
+ #
+ # Test error handling
+ #
+
+ print 'Testing error handling:'
+
+ try:
+ print pool.apply(f, (5,))
+ except ZeroDivisionError:
+ print '\tGot ZeroDivisionError as expected from pool.apply()'
+ else:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ try:
+ print pool.map(f, range(10))
+ except ZeroDivisionError:
+ print '\tGot ZeroDivisionError as expected from pool.map()'
+ else:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ try:
+ print list(pool.imap(f, range(10)))
+ except ZeroDivisionError:
+ print '\tGot ZeroDivisionError as expected from list(pool.imap())'
+ else:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ it = pool.imap(f, range(10))
+ for i in range(10):
+ try:
+ x = it.next()
+ except ZeroDivisionError:
+ if i == 5:
+ pass
+ except StopIteration:
+ break
+ else:
+ if i == 5:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ assert i == 9
+ print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
+ print
+
+ #
+ # Testing timeouts
+ #
+
+ print 'Testing ApplyResult.get() with timeout:',
+ res = pool.apply_async(calculate, TASKS[0])
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % res.get(0.02))
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print
+ print
+
+ print 'Testing IMapIterator.next() with timeout:',
+ it = pool.imap(calculatestar, TASKS)
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % it.next(0.02))
+ except StopIteration:
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print
+ print
+
+ #
+ # Testing callback
+ #
+
+ print 'Testing callback:'
+
+ A = []
+ B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
+
+ r = pool.apply_async(mul, (7, 8), callback=A.append)
+ r.wait()
+
+ r = pool.map_async(pow3, range(10), callback=A.extend)
+ r.wait()
+
+ if A == B:
+ print '\tcallbacks succeeded\n'
+ else:
+ print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
+
+ #
+ # Check there are no outstanding tasks
+ #
+
+ assert not pool._cache, 'cache = %r' % pool._cache
+
+ #
+ # Check close() methods
+ #
+
+ print 'Testing close():'
+
+ for worker in pool._pool:
+ assert worker.is_alive()
+
+ result = pool.apply_async(time.sleep, [0.5])
+ pool.close()
+ pool.join()
+
+ assert result.get() is None
+
+ for worker in pool._pool:
+ assert not worker.is_alive()
+
+ print '\tclose() succeeded\n'
+
+ #
+ # Check terminate() method
+ #
+
+ print 'Testing terminate():'
+
+ pool = multiprocessing.Pool(2)
+ DELTA = 0.1
+ ignore = pool.apply(pow3, [2])
+ results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
+ pool.terminate()
+ pool.join()
+
+ for worker in pool._pool:
+ assert not worker.is_alive()
+
+ print '\tterminate() succeeded\n'
+
+ #
+ # Check garbage collection
+ #
+
+ print 'Testing garbage collection:'
+
+ pool = multiprocessing.Pool(2)
+ DELTA = 0.1
+ processes = pool._pool
+ ignore = pool.apply(pow3, [2])
+ results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
+
+ results = pool = None
+
+ time.sleep(DELTA * 2)
+
+ for worker in processes:
+ assert not worker.is_alive()
+
+ print '\tgarbage collection succeeded\n'
+
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+
+ assert len(sys.argv) in (1, 2)
+
+ if len(sys.argv) == 1 or sys.argv[1] == 'processes':
+ print ' Using processes '.center(79, '-')
+ elif sys.argv[1] == 'threads':
+ print ' Using threads '.center(79, '-')
+ import multiprocessing.dummy as multiprocessing
+ else:
+ print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
+ raise SystemExit(2)
+
+ test()
View
276 Doc/includes/mp_synchronize.py
@@ -0,0 +1,276 @@
+#
+# A test file for the `multiprocessing` package
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+import time, sys, random
+from Queue import Empty
+
+import multiprocessing # may get overwritten
+
+
+#### TEST_VALUE
+
+def value_func(running, mutex):
+ random.seed()
+ time.sleep(random.random()*4)
+
+ mutex.acquire()
+ print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
+ running.value -= 1
+ mutex.release()
+
+def test_value():
+ TASKS = 10
+ running = multiprocessing.Value('i', TASKS)
+ mutex = multiprocessing.Lock()
+
+ for i in range(TASKS):
+ p = multiprocessing.Process(target=value_func, args=(running, mutex))
+ p.start()
+
+ while running.value > 0:
+ time.sleep(0.08)
+ mutex.acquire()
+ print running.value,
+ sys.stdout.flush()
+ mutex.release()
+
+ print
+ print 'No more running processes'
+
+
+#### TEST_QUEUE
+
+def queue_func(queue):
+ for i in range(30):
+ time.sleep(0.5 * random.random())
+ queue.put(i*i)
+ queue.put('STOP')
+
+def test_queue():
+ q = multiprocessing.Queue()
+
+ p = multiprocessing.Process(target=queue_func, args=(q,))
+ p.start()
+
+ o = None
+ while o != 'STOP':
+ try:
+ o = q.get(timeout=0.3)
+ print o,
+ sys.stdout.flush()
+ except Empty:
+ print 'TIMEOUT'
+
+ print
+
+
+#### TEST_CONDITION
+
+def condition_func(cond):
+ cond.acquire()
+ print '\t' + str(cond)
+ time.sleep(2)
+ print '\tchild is notifying'
+ print '\t' + str(cond)
+ cond.notify()
+ cond.release()
+
+def test_condition():
+ cond = multiprocessing.Condition()
+
+ p = multiprocessing.Process(target=condition_func, args=(cond,))
+ print cond
+
+ cond.acquire()
+ print cond
+ cond.acquire()
+ print cond
+
+ p.start()
+
+ print 'main is waiting'
+ cond.wait()
+ print 'main has woken up'
+
+ print cond
+ cond.release()
+ print cond
+ cond.release()
+
+ p.join()
+ print cond
+
+
+#### TEST_SEMAPHORE
+
+def semaphore_func(sema, mutex, running):
+ sema.acquire()
+
+ mutex.acquire()
+ running.value += 1
+ print running.value, 'tasks are running'
+ mutex.release()
+
+ random.seed()
+ time.sleep(random.random()*2)
+
+ mutex.acquire()
+ running.value -= 1
+ print '%s has finished' % multiprocessing.current_process()
+ mutex.release()
+
+ sema.release()
+
+def test_semaphore():
+ sema = multiprocessing.Semaphore(3)
+ mutex = multiprocessing.RLock()
+ running = multiprocessing.Value('i', 0)
+
+ processes = [
+ multiprocessing.Process(target=semaphore_func,
+ args=(sema, mutex, running))
+ for i in range(10)
+ ]
+
+ for p in processes:
+ p.start()
+
+ for p in processes:
+ p.join()
+
+
+#### TEST_JOIN_TIMEOUT
+
+def join_timeout_func():
+ print '\tchild sleeping'
+ time.sleep(5.5)
+ print '\n\tchild terminating'
+
+def test_join_timeout():
+ p = multiprocessing.Process(target=join_timeout_func)
+ p.start()
+
+ print 'waiting for process to finish'
+
+ while 1:
+ p.join(timeout=1)
+ if not p.is_alive():
+ break
+ print '.',
+ sys.stdout.flush()
+
+
+#### TEST_EVENT
+
+def event_func(event):
+ print '\t%r is waiting' % multiprocessing.current_process()
+ event.wait()
+ print '\t%r has woken up' % multiprocessing.current_process()
+
+def test_event():
+ event = multiprocessing.Event()
+
+ processes = [multiprocessing.Process(target=event_func, args=(event,))
+ for i in range(5)]
+
+ for p in processes:
+ p.start()
+
+ print 'main is sleeping'
+ time.sleep(2)
+
+ print 'main is setting event'
+ event.set()
+
+ for p in processes:
+ p.join()
+
+
+#### TEST_SHAREDVALUES
+
+def sharedvalues_func(values, arrays, shared_values, shared_arrays):
+ for i in range(len(values)):
+ v = values[i][1]
+ sv = shared_values[i].value
+ assert v == sv
+
+ for i in range(len(values)):
+ a = arrays[i][1]
+ sa = list(shared_arrays[i][:])
+ assert a == sa
+
+ print 'Tests passed'
+
+def test_sharedvalues():
+ values = [
+ ('i', 10),
+ ('h', -2),
+ ('d', 1.25)
+ ]
+ arrays = [
+ ('i', range(100)),
+ ('d', [0.25 * i for i in range(100)]),
+ ('H', range(1000))
+ ]
+
+ shared_values = [multiprocessing.Value(id, v) for id, v in values]
+ shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
+
+ p = multiprocessing.Process(
+ target=sharedvalues_func,
+ args=(values, arrays, shared_values, shared_arrays)
+ )
+ p.start()
+ p.join()
+
+ assert p.exitcode == 0
+
+
+####
+
+def test(namespace=multiprocessing):
+ global multiprocessing
+
+ multiprocessing = namespace
+
+ for func in [ test_value, test_queue, test_condition,
+ test_semaphore, test_join_timeout, test_event,
+ test_sharedvalues ]:
+
+ print '\n\t######## %s\n' % func.__name__
+ func()
+
+ ignore = multiprocessing.active_children() # cleanup any old processes
+ if hasattr(multiprocessing, '_debug_info'):
+ info = multiprocessing._debug_info()
+ if info:
+ print info
+ raise ValueError, 'there should be no positive refcounts left'
+
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+
+ assert len(sys.argv) in (1, 2)
+
+ if len(sys.argv) == 1 or sys.argv[1] == 'processes':
+ print ' Using processes '.center(79, '-')
+ namespace = multiprocessing
+ elif sys.argv[1] == 'manager':
+ print ' Using processes and a manager '.center(79, '-')
+ namespace = multiprocessing.Manager()
+ namespace.Process = multiprocessing.Process
+ namespace.current_process = multiprocessing.current_process
+ namespace.active_children = multiprocessing.active_children
+ elif sys.argv[1] == 'threads':
+ print ' Using threads '.center(79, '-')
+ import multiprocessing.dummy as namespace
+ else:
+ print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
+ raise SystemExit, 2
+
+ test(namespace)
View
70 Doc/includes/mp_webserver.py
@@ -0,0 +1,70 @@
+#
+# Example where a pool of http servers share a single listening socket
+#
+# On Windows this module depends on the ability to pickle a socket
+# object so that the worker processes can inherit a copy of the server
+# object. (We import `multiprocessing.reduction` to enable this pickling.)
+#
+# Not sure if we should synchronize access to `socket.accept()` method by
+# using a process-shared lock -- does not seem to be necessary.
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+import os
+import sys
+
+from multiprocessing import Process, current_process, freeze_support
+from BaseHTTPServer import HTTPServer
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+
+if sys.platform == 'win32':
+ import multiprocessing.reduction # make sockets pickable/inheritable
+
+
+def note(format, *args):
+ sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
+
+
+class RequestHandler(SimpleHTTPRequestHandler):
+ # we override log_message() to show which process is handling the request
+ def log_message(self, format, *args):
+ note(format, *args)
+
+def serve_forever(server):
+ note('starting server')
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ pass
+
+
+def runpool(address, number_of_processes):
+ # create a single server object -- children will each inherit a copy
+ server = HTTPServer(address, RequestHandler)
+
+ # create child processes to act as workers
+ for i in range(number_of_processes-1):
+ Process(target=serve_forever, args=(server,)).start()
+
+ # main process also acts as a worker
+ serve_forever(server)
+
+
+def test():
+ DIR = os.path.join(os.path.dirname(__file__), '..')
+ ADDRESS = ('localhost', 8000)
+ NUMBER_OF_PROCESSES = 4
+
+ print 'Serving at http://%s:%d using %d worker processes' % \
+ (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
+ print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
+
+ os.chdir(DIR)
+ runpool(ADDRESS, NUMBER_OF_PROCESSES)
+
+
+if __name__ == '__main__':
+ freeze_support()
+ test()
View
90 Doc/includes/mp_workers.py
@@ -0,0 +1,90 @@
+#
+# Simple example which uses a pool of workers to carry out some tasks.
+#
+# Notice that the results will probably not come out of the output
+# queue in the same in the same order as the corresponding tasks were
+# put on the input queue. If it is important to get the results back
+# in the original order then consider using `Pool.map()` or
+# `Pool.imap()` (which will save on the amount of code needed anyway).
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+import time
+import random
+
+from multiprocessing import Process, Queue, current_process, freeze_support
+
+#
+# Function run by worker processes
+#
+
+def worker(input, output):
+ for func, args in iter(input.get, 'STOP'):
+ result = calculate(func, args)
+ output.put(result)
+
+#
+# Function used to calculate result
+#
+
+def calculate(func, args):
+ result = func(*args)
+ return '%s says that %s%s = %s' % \
+ (current_process().name, func.__name__, args, result)
+
+#
+# Functions referenced by tasks
+#
+
+def mul(a, b):
+ time.sleep(0.5*random.random())
+ return a * b
+
+def plus(a, b):
+ time.sleep(0.5*random.random())
+ return a + b
+
+#
+#
+#
+
+def test():
+ NUMBER_OF_PROCESSES = 4
+ TASKS1 = [(mul, (i, 7)) for i in range(20)]
+ TASKS2 = [(plus, (i, 8)) for i in range(10)]
+
+ # Create queues
+ task_queue = Queue()
+ done_queue = Queue()
+
+ # Submit tasks
+ for task in TASKS1:
+ task_queue.put(task)
+
+ # Start worker processes
+ for i in range(NUMBER_OF_PROCESSES):
+ Process(target=worker, args=(task_queue, done_queue)).start()
+
+ # Get and print results
+ print 'Unordered results:'
+ for i in range(len(TASKS1)):
+ print '\t', done_queue.get()
+
+ # Add more tasks using `put()`
+ for task in TASKS2:
+ task_queue.put(task)
+
+ # Get and print some more results
+ for i in range(len(TASKS2)):
+ print '\t', done_queue.get()
+
+ # Tell child processes to stop
+ for i in range(NUMBER_OF_PROCESSES):
+ task_queue.put('STOP')
+
+
+if __name__ == '__main__':
+ freeze_support()
+ test()
View
22 Doc/index.rst
@@ -0,0 +1,22 @@
+.. multiprocessing documentation master file, created by sphinx-quickstart on Wed Nov 26 12:47:00 2008.
+ You can adapt this file completely to your liking, but it should at least
+ contain the root `toctree` directive.
+
+Welcome to multiprocessing's documentation!
+===========================================
+
+Contents:
+
+.. toctree::
+
+ library/multiprocessing.rst
+ glossary.rst
+
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
+
View
2,230 Doc/library/multiprocessing.rst
2,230 additions, 0 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
View
91 INSTALL.txt
@@ -0,0 +1,91 @@
+.. default-role:: literal
+
+================================
+ Installation of multiprocessing
+================================
+
+Versions earlier than Python 2.4 are not supported. If you are using
+Python 2.4 then you must install the `ctypes` package (which comes
+automatically with Python 2.5). Users of Python 2.4 on Windows
+also need to install the `pywin32` package.
+
+On Unix It's highly recommended to use Python 2.5.3 (not yet released) or
+apply the ``fork-thread-patch-2`` patch from `Issue 1683
+http://bugs.python.org/issue1683`_.
+
+Windows binary builds for Python 2.4 and Python 2.5 are available at
+
+ http://pypi.python.org/pypi/multiprocessing
+
+Python 2.6 and newer versions already come with multiprocessing. Although
+the stand alone variant of the multiprocessing package is kept compatible
+with 2.6, you mustn't install it with Python 2.6.
+
+Otherwise, if you have the correct C compiler setup then the source
+distribution can be installed the usual way::
+
+ python setup.py install
+
+It should not be necessary to do any editing of `setup.py` if you are
+using Windows, Mac OS X or Linux. On other unices it may be necessary
+to modify the values of the `macros` dictionary or `libraries` list.
+The section to modify reads ::
+
+ else:
+ macros = dict(
+ HAVE_SEM_OPEN=1,
+ HAVE_SEM_TIMEDWAIT=1,
+ HAVE_FD_TRANSFER=1
+ )
+ libraries = ['rt']
+
+More details can be found in the comments in `setup.py`.
+
+Note that if you use `HAVE_SEM_OPEN=0` then support for posix
+semaphores will not been compiled in, and then many of the functions
+in the `processing` namespace like `Lock()`, `Queue()` or will not be
+available. However, one can still create a manager using `manager =
+processing.Manager()` and then do `lock = manager.Lock()` etc.
+
+
+Running tests
+-------------
+
+To run the test scripts using Python 2.5 do ::
+
+ python -m multiprocessing.tests
+
+and on Python 2.4 do ::
+
+ python -c "from multiprocessing.tests import main; main()"
+
+The sources also come with a Makefile. To run the unit tests with the
+Makefile using Python 2.5 do ::
+
+ make test
+
+using another version of Python do ::
+
+ make test PYTHON=python2.4
+
+This will run a number of test scripts using both processes and threads.
+
+
+Running examples
+----------------
+
+The make target `examples` runs several example scripts.
+
+
+Building docs
+-------------
+
+To build the standalone documentation you need Sphinx 0.5 and setuptools
+0.6c9 or newer. Both are available at http://pypi.python.org/. With
+setuptools installed, do ::
+
+ sudo easy_install-2.5 "Sphinx>=0.5"
+ make doc
+
+The docs end up in ``build/sphinx/builder_name``.
+
View
29 LICENSE.txt
@@ -0,0 +1,29 @@
+Copyright (c) 2006-2008, R Oudkerk and Contributors
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3. Neither the name of author nor the names of any contributors may be
+ used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGE.
+
View
278 Lib/mpfix/__init__.py
@@ -0,0 +1,278 @@
+#
+# Package analogous to 'threading.py' but using processes
+#
+# multiprocessing/__init__.py
+#
+# This package is intended to duplicate the functionality (and much of
+# the API) of threading.py but uses processes instead of threads. A
+# subpackage 'multiprocessing.dummy' has the same API but is a simple
+# wrapper for 'threading'.
+#
+# Try calling `multiprocessing.doc.main()` to read the html
+# documentation in in a webbrowser.
+#
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+#
+
+__version__ = '2.6.2.1'
+
+__all__ = [
+ 'Process', 'current_process', 'active_children', 'freeze_support',
+ 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
+ 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
+ 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
+ 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+ ]
+
+__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
+
+#
+# Imports
+#
+
+import os
+import sys
+
+# backward compatibility patch for Python 2,4 and 2.5
+import multiprocessing.patch
+multiprocessing.patch.monkey()
+
+from multiprocessing.process import Process, current_process, active_children
+from multiprocessing.util import SUBDEBUG, SUBWARNING
+
+#
+# Exceptions
+#
+
+class ProcessError(Exception):
+ pass
+
+class BufferTooShort(ProcessError):
+ pass
+
+class TimeoutError(ProcessError):
+ pass
+
+class AuthenticationError(ProcessError):
+ pass
+
+# This is down here because _multiprocessing uses BufferTooShort
+import _multiprocessing
+# alias for forward compatibility
+sys.modules['_multiprocessing'] = _multiprocessing
+
+#
+# Definitions not depending on native semaphores
+#
+
+def Manager():
+ '''
+ Returns a manager associated with a running server process
+
+ The managers methods such as `Lock()`, `Condition()` and `Queue()`
+ can be used to create shared objects.
+ '''
+ from multiprocessing.managers import SyncManager
+ m = SyncManager()
+ m.start()
+ return m
+
+def Pipe(duplex=True):
+ '''
+ Returns two connection object connected by a pipe
+ '''
+ from multiprocessing.connection import Pipe
+ return Pipe(duplex)
+
+def cpu_count():
+ '''
+ Returns the number of CPUs in the system
+ '''
+ if sys.platform == 'win32':
+ try:
+ num = int(os.environ['NUMBER_OF_PROCESSORS'])
+ except (ValueError, KeyError):
+ num = 0
+ elif 'bsd' in sys.platform or sys.platform == 'darwin':
+ try:
+ num = int(os.popen('sysctl -n hw.ncpu').read())
+ except ValueError:
+ num = 0
+ else:
+ try:
+ num = os.sysconf('SC_NPROCESSORS_ONLN')
+ except (ValueError, OSError, AttributeError):
+ num = 0
+
+ if num >= 1:
+ return num
+ else:
+ raise NotImplementedError('cannot determine number of cpus')
+
+def freeze_support():
+ '''
+ Check whether this is a fake forked process in a frozen executable.
+ If so then run code specified by commandline and exit.
+ '''
+ if sys.platform == 'win32' and getattr(sys, 'frozen', False):
+ from multiprocessing.forking import freeze_support
+ freeze_support()
+
+def get_logger():
+ '''
+ Return package logger -- if it does not already exist then it is created
+ '''
+ from multiprocessing.util import get_logger
+ return get_logger()
+
+def log_to_stderr(level=None):
+ '''
+ Turn on logging and add a handler which prints to stderr
+ '''
+ from multiprocessing.util import log_to_stderr
+ return log_to_stderr(level)
+
+def allow_connection_pickling():
+ '''
+ Install support for sending connections and sockets between processes
+ '''
+ from multiprocessing import reduction
+
+#
+# Definitions depending on native semaphores
+#
+
+def Lock():
+ '''
+ Returns a non-recursive lock object
+ '''
+ from multiprocessing.synchronize import Lock
+ return Lock()
+
+def RLock():
+ '''
+ Returns a recursive lock object
+ '''
+ from multiprocessing.synchronize import RLock
+ return RLock()
+
+def Condition(lock=None):
+ '''
+ Returns a condition object
+ '''
+ from multiprocessing.synchronize import Condition
+ return Condition(lock)
+
+def Semaphore(value=1):
+ '''
+ Returns a semaphore object
+ '''
+ from multiprocessing.synchronize import Semaphore
+ return Semaphore(value)
+
+def BoundedSemaphore(value=1):
+ '''
+ Returns a bounded semaphore object
+ '''
+ from multiprocessing.synchronize import BoundedSemaphore
+ return BoundedSemaphore(value)
+
+def Event():
+ '''
+ Returns an event object
+ '''
+ from multiprocessing.synchronize import Event
+ return Event()
+
+def Queue(maxsize=0):
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import Queue
+ return Queue(maxsize)
+
+def JoinableQueue(maxsize=0):
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import JoinableQueue
+ return JoinableQueue(maxsize)
+
+def Pool(processes=None, initializer=None, initargs=()):
+ '''
+ Returns a process pool object
+ '''
+ from multiprocessing.pool import Pool
+ return Pool(processes, initializer, initargs)
+
+def RawValue(typecode_or_type, *args):
+ '''
+ Returns a shared object
+ '''
+ from multiprocessing.sharedctypes import RawValue
+ return RawValue(typecode_or_type, *args)
+
+def RawArray(typecode_or_type, size_or_initializer):
+ '''
+ Returns a shared array
+ '''
+ from multiprocessing.sharedctypes import RawArray
+ return RawArray(typecode_or_type, size_or_initializer)
+
+def Value(typecode_or_type, *args, **kwds):
+ '''
+ Returns a synchronized shared object
+ '''
+ from multiprocessing.sharedctypes import Value
+ return Value(typecode_or_type, *args, **kwds)
+
+def Array(typecode_or_type, size_or_initializer, **kwds):
+ '''
+ Returns a synchronized shared array
+ '''
+ from multiprocessing.sharedctypes import Array
+ return Array(typecode_or_type, size_or_initializer, **kwds)
+
+#
+#
+#
+
+if sys.platform == 'win32':
+
+ def set_executable(executable):
+ '''
+ Sets the path to a python.exe or pythonw.exe binary used to run
+ child processes on Windows instead of sys.executable.
+ Useful for people embedding Python.
+ '''
+ from multiprocessing.forking import set_executable
+ set_executable(executable)
+
+ __all__ += ['set_executable']
View
419 Lib/mpfix/connection.py
@@ -0,0 +1,419 @@
+#
+# A higher level module for using sockets (or Windows named pipes)
+#
+# multiprocessing/connection.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = [ 'Client', 'Listener', 'Pipe' ]
+
+import os
+import sys
+import socket
+import errno
+import time
+import tempfile
+import itertools
+
+import _multiprocessing
+from multiprocessing import current_process, AuthenticationError
+from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
+from multiprocessing.forking import duplicate, close
+
+# forward compatibility
+bytes = str
+
+#
+#
+#
+
+BUFSIZE = 8192
+
+_mmap_counter = itertools.count()
+
+default_family = 'AF_INET'
+families = ['AF_INET']
+
+if hasattr(socket, 'AF_UNIX'):
+ default_family = 'AF_UNIX'
+ families += ['AF_UNIX']
+
+if sys.platform == 'win32':
+ default_family = 'AF_PIPE'
+ families += ['AF_PIPE']
+
+#
+#
+#
+
+def arbitrary_address(family):
+ '''
+ Return an arbitrary free address for the given family
+ '''
+ if family == 'AF_INET':
+ return ('localhost', 0)
+ elif family == 'AF_UNIX':
+ return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
+ elif family == 'AF_PIPE':
+ return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
+ (os.getpid(), _mmap_counter.next()))
+ else:
+ raise ValueError('unrecognized family')
+
+
+def address_type(address):
+ '''
+ Return the types of the address
+
+ This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
+ '''
+ if type(address) == tuple:
+ return 'AF_INET'
+ elif type(address) is str and address.startswith('\\\\'):
+ return 'AF_PIPE'
+ elif type(address) is str:
+ return 'AF_UNIX'
+ else:
+ raise ValueError('address type of %r unrecognized' % address)
+
+#
+# Public functions
+#
+
+class Listener(object):
+ '''
+ Returns a listener object.
+
+ This is a wrapper for a bound socket which is 'listening' for
+ connections, or for a Windows named pipe.
+ '''
+ def __init__(self, address=None, family=None, backlog=1, authkey=None):
+ family = family or (address and address_type(address)) \
+ or default_family
+ address = address or arbitrary_address(family)
+
+ if family == 'AF_PIPE':
+ self._listener = PipeListener(address, backlog)
+ else:
+ self._listener = SocketListener(address, family, backlog)
+
+ if authkey is not None and not isinstance(authkey, bytes):
+ raise TypeError, 'authkey should be a byte string'
+
+ self._authkey = authkey
+
+ def accept(self):
+ '''
+ Accept a connection on the bound socket or named pipe of `self`.
+
+ Returns a `Connection` object.
+ '''
+ c = self._listener.accept()
+ if self._authkey:
+ deliver_challenge(c, self._authkey)
+ answer_challenge(c, self._authkey)
+ return c
+
+ def close(self):
+ '''
+ Close the bound socket or named pipe of `self`.
+ '''
+ return self._listener.close()
+
+ address = property(lambda self: self._listener._address)
+ last_accepted = property(lambda self: self._listener._last_accepted)
+
+
+def Client(address, family=None, authkey=None):
+ '''
+ Returns a connection to the address of a `Listener`
+ '''
+ family = family or address_type(address)
+ if family == 'AF_PIPE':
+ c = PipeClient(address)
+ else:
+ c = SocketClient(address)
+
+ if authkey is not None and not isinstance(authkey, bytes):
+ raise TypeError, 'authkey should be a byte string'
+
+ if authkey is not None:
+ answer_challenge(c, authkey)
+ deliver_challenge(c, authkey)
+
+ return c
+
+
+if sys.platform != 'win32':
+
+ def Pipe(duplex=True):
+ '''
+ Returns pair of connection objects at either end of a pipe
+ '''
+ if duplex:
+ s1, s2 = socket.socketpair()
+ c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
+ c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
+ s1.close()
+ s2.close()
+ else:
+ fd1, fd2 = os.pipe()
+ c1 = _multiprocessing.Connection(fd1, writable=False)
+ c2 = _multiprocessing.Connection(fd2, readable=False)
+
+ return c1, c2
+
+else:
+
+ from multiprocessing._multiprocessing import win32
+
+ def Pipe(duplex=True):
+ '''
+ Returns pair of connection objects at either end of a pipe
+ '''
+ address = arbitrary_address('AF_PIPE')
+ if duplex:
+ openmode = win32.PIPE_ACCESS_DUPLEX
+ access = win32.GENERIC_READ | win32.GENERIC_WRITE
+ obsize, ibsize = BUFSIZE, BUFSIZE
+ else:
+ openmode = win32.PIPE_ACCESS_INBOUND
+ access = win32.GENERIC_WRITE
+ obsize, ibsize = 0, BUFSIZE
+
+ h1 = win32.CreateNamedPipe(
+ address, openmode,
+ win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
+ win32.PIPE_WAIT,
+ 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ )
+ h2 = win32.CreateFile(
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ )
+ win32.SetNamedPipeHandleState(
+ h2, win32.PIPE_READMODE_MESSAGE, None, None
+ )
+
+ try:
+ win32.ConnectNamedPipe(h1, win32.NULL)
+ except WindowsError, e:
+ if e.args[0] != win32.ERROR_PIPE_CONNECTED:
+ raise
+
+ c1 = _mult