Skip to content

Commit

Permalink
Merge branch 'futures'
Browse files Browse the repository at this point in the history
  • Loading branch information
bdarnell committed Sep 29, 2012
2 parents 58bc5d3 + 541160c commit 6e7488f
Show file tree
Hide file tree
Showing 13 changed files with 578 additions and 74 deletions.
10 changes: 10 additions & 0 deletions tornado/autoreload.py
Expand Up @@ -71,6 +71,7 @@
import os
import pkgutil
import sys
import traceback
import types
import subprocess

Expand Down Expand Up @@ -278,7 +279,16 @@ def main():
except Exception, e:
logging.basicConfig()
gen_log.warning("Script exited with uncaught exception", exc_info=True)
# If an exception occurred at import time, the file with the error
# never made it into sys.modules and so we won't know to watch it.
# Just to make sure we've covered everything, walk the stack trace
# from the exception and watch every file.
for (filename, lineno, name, line) in traceback.extract_tb(sys.exc_info()[2]):
watch(filename)
if isinstance(e, SyntaxError):
# SyntaxErrors are special: their innermost stack frame is fake
# so extract_tb won't see it and we have to get the filename
# from the exception object.
watch(e.filename)
else:
logging.basicConfig()
Expand Down
127 changes: 127 additions & 0 deletions tornado/concurrent.py
@@ -0,0 +1,127 @@
#!/usr/bin/env python
#
# Copyright 2012 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, with_statement

import functools
import sys

from tornado.stack_context import ExceptionStackContext
from tornado.util import raise_exc_info

try:
from concurrent import futures
except ImportError:
futures = None


class DummyFuture(object):
def __init__(self):
self._done = False
self._result = None
self._exception = None
self._callbacks = []

def cancel(self):
return False

def cancelled(self):
return False

def running(self):
return not self._done

def done(self):
return self._done

def result(self, timeout=None):
self._check_done()
if self._exception:
raise self._exception
return self._result

def exception(self, timeout=None):
self._check_done()
if self._exception:
return self._exception
else:
return None

def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)

def set_result(self, result):
self._result = result
self._set_done()

def set_exception(self, exception):
self._exception = exception
self._set_done()

def _check_done(self):
if not self._done:
raise Exception("DummyFuture does not support blocking for results")

def _set_done(self):
self._done = True
for cb in self._callbacks:
# TODO: error handling
cb(self)
self._callbacks = None

if futures is None:
Future = DummyFuture
else:
Future = futures.Future

class DummyExecutor(object):
def submit(self, fn, *args, **kwargs):
future = Future()
try:
future.set_result(fn(*args, **kwargs))
except Exception, e:
future.set_exception(e)
return future

dummy_executor = DummyExecutor()

def run_on_executor(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback")
future = self.executor.submit(fn, self, *args, **kwargs)
if callback:
self.io_loop.add_future(future, callback)
return future
return wrapper

# TODO: this needs a better name
def future_wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
future = Future()
if kwargs.get('callback') is not None:
future.add_done_callback(kwargs.pop('callback'))
kwargs['callback'] = future.set_result
def handle_error(typ, value, tb):
future.set_exception(value)
return True
with ExceptionStackContext(handle_error):
f(*args, **kwargs)
return future
return wrapper
23 changes: 23 additions & 0 deletions tornado/gen.py
Expand Up @@ -69,6 +69,8 @@ def get(self):
import sys
import types

from tornado.concurrent import Future
from tornado.ioloop import IOLoop
from tornado.stack_context import ExceptionStackContext


Expand Down Expand Up @@ -247,6 +249,24 @@ def get_result(self):
return self.runner.pop_result(self.key)


class YieldFuture(YieldPoint):
def __init__(self, future, io_loop=None):
self.future = future
self.io_loop = io_loop or IOLoop.current()

def start(self, runner):
self.runner = runner
self.key = object()
runner.register_callback(self.key)
self.io_loop.add_future(self.future, runner.result_callback(self.key))

def is_ready(self):
return self.runner.is_ready(self.key)

def get_result(self):
return self.runner.pop_result(self.key).result()


class Multi(YieldPoint):
"""Runs multiple asynchronous operations in parallel.
Expand Down Expand Up @@ -360,6 +380,9 @@ def run(self):
raise
if isinstance(yielded, list):
yielded = Multi(yielded)
if isinstance(yielded, Future):
# TODO: lists of futures
yielded = YieldFuture(yielded)
if isinstance(yielded, YieldPoint):
self.yield_point = yielded
try:
Expand Down
39 changes: 39 additions & 0 deletions tornado/ioloop.py
Expand Up @@ -30,6 +30,7 @@

import datetime
import errno
import functools
import heapq
import logging
import os
Expand All @@ -39,6 +40,7 @@
import time
import traceback

from tornado.concurrent import DummyFuture
from tornado.log import app_log, gen_log
from tornado import stack_context

Expand All @@ -47,6 +49,11 @@
except ImportError:
signal = None

try:
from concurrent import futures
except ImportError:
futures = None

from tornado.platform.auto import set_close_exec, Waker


Expand Down Expand Up @@ -108,6 +115,8 @@ def connection_ready(sock, fd, events):
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()

_current = threading.local()

def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
Expand Down Expand Up @@ -167,6 +176,20 @@ def install(self):
assert not IOLoop.initialized()
IOLoop._instance = self

@staticmethod
def current():
current = getattr(IOLoop._current, "instance", None)
if current is None:
raise ValueError("no current IOLoop")
return current

def make_current(self):
IOLoop._current.instance = self

def clear_current(self):
assert IOLoop._current.instance is self
IOLoop._current.instance = None

def close(self, all_fds=False):
"""Closes the IOLoop, freeing any resources used.
Expand Down Expand Up @@ -267,6 +290,8 @@ def start(self):
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True

Expand Down Expand Up @@ -381,6 +406,7 @@ def start(self):
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)

Expand Down Expand Up @@ -490,6 +516,19 @@ def add_callback_from_signal(self, callback):
# but either way will work.
self._callbacks.append(stack_context.wrap(callback))

if futures is not None:
_FUTURE_TYPES = (futures.Future, DummyFuture)
else:
_FUTURE_TYPES = DummyFuture
def add_future(self, future, callback):
"""Schedules a callback on the IOLoop when the given future is finished.
"""
assert isinstance(future, IOLoop._FUTURE_TYPES)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(
functools.partial(callback, future)))

def _run_callback(self, callback):
try:
callback()
Expand Down
11 changes: 11 additions & 0 deletions tornado/netutil.py
Expand Up @@ -24,6 +24,7 @@
import stat

from tornado import process
from tornado.concurrent import dummy_executor, run_on_executor
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
from tornado.log import app_log
Expand Down Expand Up @@ -339,3 +340,13 @@ def accept_handler(fd, events):
raise
callback(connection, address)
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)


class Resolver(object):
def __init__(self, io_loop=None, executor=None):
self.io_loop = io_loop or IOLoop.instance()
self.executor = executor or dummy_executor

@run_on_executor
def getaddrinfo(self, *args, **kwargs):
return socket.getaddrinfo(*args, **kwargs)

0 comments on commit 6e7488f

Please sign in to comment.