-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
autoscale.py
154 lines (123 loc) · 4.49 KB
/
autoscale.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Pool Autoscaling.
This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.
The autoscale thread is only enabled if
the :option:`celery worker --autoscale` option is used.
"""
import os
import threading
from time import monotonic, sleep
from kombu.asynchronous.semaphore import DummyLock
from celery import bootsteps
from celery.utils.log import get_logger
from celery.utils.threads import bgThread
from . import state
from .components import Pool
__all__ = ('Autoscaler', 'WorkerComponent')
logger = get_logger(__name__)
debug, info, error = logger.debug, logger.info, logger.error
AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))
class WorkerComponent(bootsteps.StartStopStep):
"""Bootstep that starts the autoscaler thread/timer in the worker."""
label = 'Autoscaler'
conditional = True
requires = (Pool,)
def __init__(self, w, **kwargs):
self.enabled = w.autoscale
w.autoscaler = None
def create(self, w):
scaler = w.autoscaler = self.instantiate(
w.autoscaler_cls,
w.pool, w.max_concurrency, w.min_concurrency,
worker=w, mutex=DummyLock() if w.use_eventloop else None,
)
return scaler if not w.use_eventloop else None
def register_with_event_loop(self, w, hub):
w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
hub.call_repeatedly(
w.autoscaler.keepalive, w.autoscaler.maybe_scale,
)
def info(self, w):
"""Return `Autoscaler` info."""
return {'autoscaler': w.autoscaler.info()}
class Autoscaler(bgThread):
"""Background thread to autoscale pool workers."""
def __init__(self, pool, max_concurrency,
min_concurrency=0, worker=None,
keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
super().__init__()
self.pool = pool
self.mutex = mutex or threading.Lock()
self.max_concurrency = max_concurrency
self.min_concurrency = min_concurrency
self.keepalive = keepalive
self._last_scale_up = None
self.worker = worker
assert self.keepalive, 'cannot scale down too fast.'
def body(self):
with self.mutex:
self.maybe_scale()
sleep(1.0)
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
self.scale_up(cur - procs)
return True
cur = max(self.qty, self.min_concurrency)
if cur < procs:
self.scale_down(procs - cur)
return True
def maybe_scale(self, req=None):
if self._maybe_scale(req):
self.pool.maintain_pool()
def update(self, max=None, min=None):
with self.mutex:
if max is not None:
if max < self.processes:
self._shrink(self.processes - max)
self._update_consumer_prefetch_count(max)
self.max_concurrency = max
if min is not None:
if min > self.processes:
self._grow(min - self.processes)
self.min_concurrency = min
return self.max_concurrency, self.min_concurrency
def scale_up(self, n):
self._last_scale_up = monotonic()
return self._grow(n)
def scale_down(self, n):
if self._last_scale_up and (
monotonic() - self._last_scale_up > self.keepalive):
return self._shrink(n)
def _grow(self, n):
info('Scaling up %s processes.', n)
self.pool.grow(n)
def _shrink(self, n):
info('Scaling down %s processes.', n)
try:
self.pool.shrink(n)
except ValueError:
debug("Autoscaler won't scale down: all processes busy.")
except Exception as exc:
error('Autoscaler: scale_down: %r', exc, exc_info=True)
def _update_consumer_prefetch_count(self, new_max):
diff = new_max - self.max_concurrency
if diff:
self.worker.consumer._update_prefetch_count(
diff
)
def info(self):
return {
'max': self.max_concurrency,
'min': self.min_concurrency,
'current': self.processes,
'qty': self.qty,
}
@property
def qty(self):
return len(state.reserved_requests)
@property
def processes(self):
return self.pool.num_processes