forked from tornadoweb/tornado
/
ioloop.py
399 lines (335 loc) · 13 KB
/
ioloop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
#!/usr/bin/env python
#
# Copyright 2009 Facebook
# Copyright 2010 Dusty Phillips
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""A level-triggered I/O loop for non-blocking sockets."""
import bisect
import errno
import fcntl
import logging
import os
import select
import time
class IOLoop:
"""A level-triggered I/O loop.
We use epoll if it is available, or else we fall back on select(). If
you are implementing a system that needs to handle 1000s of simultaneous
connections, you should use Linux and either compile our epoll module or
use Python 2.6+ to get epoll support.
Example usage for a simple TCP server:
import errno
import functools
import ioloop
import socket
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error, e:
if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", port))
sock.listen(128)
io_loop = ioloop.IOLoop.instance()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
"""
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
def __init__(self, impl=None):
self._impl = impl or _poll()
self._handlers = {}
self._events = {}
self._callbacks = set()
self._timeouts = []
self._running = False
self._stopped = False
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
r, w = os.pipe()
self._set_nonblocking(r)
self._set_nonblocking(w)
self._waker_reader = os.fdopen(r, "rb", 0)
self._waker_writer = os.fdopen(w, "wb", 0)
self.add_handler(r, self._read_waker, self.WRITE)
@classmethod
def instance(cls):
"""Returns a global IOLoop instance.
Most single-threaded applications have a single, global IOLoop.
Use this method instead of passing around IOLoop instances
throughout your code.
A common pattern for classes that depend on IOLoops is to use
a default argument to enable programs with multiple IOLoops
but not require the argument for simpler applications:
class MyClass:
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
"""
if not hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
@classmethod
def initialized(cls):
return hasattr(cls, "_instance")
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = handler
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
"""Changes the events we listen for fd."""
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
"""Stop listening for events on fd."""
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except OSError:
logging.debug("Error deleting fd from IOLoop", exc_info=True)
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
if self._stopped:
self._stopped = False
return
self._running = True
while True:
# Never use an infinite timeout here - it can stall epoll
poll_timeout = 0.2
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
callbacks = list(self._callbacks)
for callback in callbacks:
# A callback can add or remove other callbacks
if callback in self._callbacks:
self._callbacks.remove(callback)
self._run_callback(callback)
if self._callbacks:
poll_timeout = 0.0
if self._timeouts:
now = time.time()
while self._timeouts and self._timeouts[0].deadline <= now:
timeout = self._timeouts.pop(0)
self._run_callback(timeout.callback)
if self._timeouts:
milliseconds = self._timeouts[0].deadline - now
poll_timeout = min(milliseconds, poll_timeout)
if not self._running:
break
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
if e.args == (4, "Interrupted system call"):
logging.warning("Interrupted system call", exc_info=1)
continue
else:
raise
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except OSError as e:
if e[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except Exception:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
def stop(self):
"""Stop the loop after the current event loop iteration is complete.
If the event loop is not currently running, the next call to start()
will return immediately.
To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.
"""
self._running = False
self._stopped = True
self._wake()
def running(self):
"""Returns true if this IOLoop is currently running."""
return self._running
def add_timeout(self, deadline, callback):
"""Calls the given callback at the time deadline from the I/O loop."""
timeout = _Timeout(deadline, callback)
bisect.insort(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
self._timeouts.remove(timeout)
def add_callback(self, callback):
"""Calls the given callback on the next I/O loop iteration."""
self._callbacks.add(callback)
self._wake()
def remove_callback(self, callback):
"""Removes the given callback from the next I/O loop iteration."""
self._callbacks.remove(callback)
def _wake(self):
try:
self._waker_writer.write("x")
except IOError:
pass
def _run_callback(self, callback):
try:
callback()
except Exception:
logging.error("Exception in callback %r", callback, exc_info=True)
def _read_waker(self, fd, events):
try:
while True:
self._waker_reader.read()
except IOError:
pass
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
class _Timeout:
"""An IOLoop timeout, a UNIX timestamp and a callback"""
def __init__(self, deadline, callback):
self.deadline = deadline
self.callback = callback
def __lt__(self, other):
return (self.deadline, id(self.callback)) < (
other.deadline, id(other.callback))
class PeriodicCallback:
"""Schedules the given callback to be called periodically.
The callback is called every callback_time milliseconds.
"""
def __init__(self, callback, callback_time, io_loop=None):
self.callback = callback
self.callback_time = callback_time
self.io_loop = io_loop or IOLoop.instance()
self._running = True
def start(self):
timeout = time.time() + self.callback_time / 1000.0
self.io_loop.add_timeout(timeout, self._run)
def stop(self):
self._running = False
def _run(self):
if not self._running: return
try:
self.callback()
except Exception:
logging.error("Error in periodic callback", exc_info=True)
self.start()
class _KQueue:
"""A kqueue-based event loop for BSD/Mac systems."""
def __init__(self):
self._kqueue = select.kqueue()
self._filters = {}
def register(self, fd, events):
filter = 0
if events & IOLoop.WRITE:
filter |= select.KQ_FILTER_WRITE
if events & IOLoop.READ or filter == 0:
filter |= select.KQ_FILTER_READ
self._filters[fd] = filter
kevent = select.kevent(fd, filter=filter)
self._kqueue.control([kevent], 0)
def modify(self, fd, events):
self.unregister(fd)
self.register(fd, events)
def unregister(self, fd):
kevent = select.kevent(fd, filter=self._filters[fd],
flags=select.KQ_EV_DELETE)
self._kqueue.control([kevent], 0)
def poll(self, timeout):
kevents = self._kqueue.control(None, 1000, timeout)
events = []
for kevent in kevents:
fd = kevent.ident
flags = 0
if kevent.filter & select.KQ_FILTER_READ:
flags |= IOLoop.READ
if kevent.filter & select.KQ_FILTER_WRITE:
flags |= IOLoop.WRITE
if kevent.flags & select.KQ_EV_ERROR:
flags |= IOLoop.ERROR
events.append((fd, flags))
return events
class _Select:
"""A simple, select()-based IOLoop implementation for non-Linux systems"""
def __init__(self):
self.read_fds = set()
self.write_fds = set()
self.error_fds = set()
self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
def register(self, fd, events):
if events & IOLoop.READ: self.read_fds.add(fd)
if events & IOLoop.WRITE: self.write_fds.add(fd)
if events & IOLoop.ERROR: self.error_fds.add(fd)
def modify(self, fd, events):
self.unregister(fd)
self.register(fd, events)
def unregister(self, fd):
self.read_fds.discard(fd)
self.write_fds.discard(fd)
self.error_fds.discard(fd)
def poll(self, timeout):
readable, writeable, errors = select.select(
self.read_fds, self.write_fds, self.error_fds, timeout)
events = {}
for fd in readable:
events[fd] = events.get(fd, 0) | IOLoop.READ
for fd in writeable:
events[fd] = events.get(fd, 0) | IOLoop.WRITE
for fd in errors:
events[fd] = events.get(fd, 0) | IOLoop.ERROR
return list(events.items())
# Choose a poll implementation. Use epoll if it is available, KQueue otherwise.
# Fall back to select() for non-Linux platforms
try:
_poll = select.epoll
except AttributeError:
try:
kqueue = select.KQueue
except AttributeError:
_poll = _Select
else:
_poll = _KQueue