Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 152 lines (109 sloc) 3.508 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import logging
import os
import time

from kombu.utils.encoding import safe_repr

from celery.utils import timer2
from celery.utils.log import get_logger

logger = get_logger("celery.concurrency")


def apply_target(target, args=(), kwargs={}, callback=None,
        accept_callback=None, pid=None, **_):
    if accept_callback:
        accept_callback(pid or os.getpid(), time.time())
    callback(target(*args, **kwargs))


class BasePool(object):
    RUN = 0x1
    CLOSE = 0x2
    TERMINATE = 0x3

    Timer = timer2.Timer

    #: set to true if the pool can be shutdown from within
    #: a signal handler.
    signal_safe = True

    #: set to true if pool supports rate limits.
    #: (this is here for gevent, which currently does not implement
    #: the necessary timers).
    rlimit_safe = True

    #: set to true if pool requires the use of a mediator
    #: thread (e.g. if applying new items can block the current thread).
    requires_mediator = False

    #: set to true if pool uses greenlets.
    is_green = False

    _state = None
    _pool = None

    #: only used by multiprocessing pool
    uses_semaphore = False

    def __init__(self, limit=None, putlocks=True, **options):
        self.limit = limit
        self.putlocks = putlocks
        self.options = options
        self._does_debug = logger.isEnabledFor(logging.DEBUG)

    def on_start(self):
        pass

    def did_start_ok(self):
        return True

    def on_stop(self):
        pass

    def on_apply(self, *args, **kwargs):
        pass

    def on_terminate(self):
        pass

    def on_soft_timeout(self, job):
        pass

    def on_hard_timeout(self, job):
        pass

    def maintain_pool(self, *args, **kwargs):
        pass

    def terminate_job(self, pid):
        raise NotImplementedError(
                "%s does not implement kill_job" % (self.__class__, ))

    def restart(self):
        raise NotImplementedError(
                "%s does not implement restart" % (self.__class__, ))

    def stop(self):
        self.on_stop()
        self._state = self.TERMINATE

    def terminate(self):
        self._state = self.TERMINATE
        self.on_terminate()

    def start(self):
        self.on_start()
        self._state = self.RUN

    def close(self):
        self._state = self.CLOSE
        self.on_close()

    def on_close(self):
        pass

    def init_callbacks(self, **kwargs):
        pass

    def apply_async(self, target, args=[], kwargs={}, **options):
        """Equivalent of the :func:`apply` built-in function.

Callbacks should optimally return as soon as possible since
otherwise the thread which handles the result will get blocked.

"""
        if self._does_debug:
            logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
                         target, safe_repr(args), safe_repr(kwargs))

        return self.on_apply(target, args, kwargs,
                             waitforslot=self.putlocks,
                             **options)

    def _get_info(self):
        return {}

    @property
    def info(self):
        return self._get_info()

    @property
    def active(self):
        return self._state == self.RUN

    @property
    def num_processes(self):
        return self.limit

    @property
    def readers(self):
        return {}

    @property
    def writers(self):
        return {}

    @property
    def timers(self):
        return {}
Something went wrong with that request. Please try again.