Skip to content

Commit

Permalink
load application from factory function (#2178)
Browse files Browse the repository at this point in the history
* load application from factory function

Use `ast.parse` to validate that the string passed to the CLI is either
an attribute name or a function call. Use `ast.literal_eval` to parse
any positional and keyword arguments to the function. Call the function
to get the real application.

Co-authored-by: Connor Brinton <connor.brinton@gmail.com>

* test coverage for util.import_app

* document app factory pattern
  • Loading branch information
davidism authored and Benoit Chesneau committed Nov 19, 2019
1 parent 94ab209 commit 19cb68f
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 24 deletions.
21 changes: 21 additions & 0 deletions docs/source/run.rst
Expand Up @@ -44,8 +44,29 @@ Example with the test app:
You can now run the app with the following command::

.. code-block:: text
$ gunicorn --workers=2 test:app
The variable name can also be a function call. In that case the name
will be imported from the module, then called to get the application
object. This is commonly referred to as the "application factory"
pattern.

.. code-block:: python
def create_app():
app = FrameworkApp()
...
return app
.. code-block:: text
$ gunicorn --workers=2 'test:create_app()'
Positional and keyword arguments can also be passed, but it is
recommended to load configuration from environment variables rather than
the command line.

Commonly Used Arguments
^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
84 changes: 81 additions & 3 deletions gunicorn/util.py
Expand Up @@ -2,7 +2,7 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.

import ast
import email.utils
import errno
import fcntl
Expand Down Expand Up @@ -320,6 +320,32 @@ def write_error(sock, status_int, reason, mesg):
write_nonblock(sock, http.encode('latin1'))


def _called_with_wrong_args(f):
"""Check whether calling a function raised a ``TypeError`` because
the call failed or because something in the function raised the
error.
:param f: The function that was called.
:return: ``True`` if the call failed.
"""
tb = sys.exc_info()[2]

try:
while tb is not None:
if tb.tb_frame.f_code is f.__code__:
# In the function, it was called successfully.
return False

tb = tb.tb_next

# Didn't reach the function.
return True
finally:
# Delete tb to break a circular reference in Python 2.
# https://docs.python.org/2/library/sys.html#sys.exc_info
del tb


def import_app(module):
parts = module.split(":", 1)
if len(parts) == 1:
Expand All @@ -335,13 +361,65 @@ def import_app(module):
raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
raise

# Parse obj as a single expression to determine if it's a valid
# attribute name or function call.
try:
expression = ast.parse(obj, mode="eval").body
except SyntaxError:
raise AppImportError(
"Failed to parse %r as an attribute name or function call." % obj
)

if isinstance(expression, ast.Name):
name = expression.id
args = kwargs = None
elif isinstance(expression, ast.Call):
# Ensure the function name is an attribute name only.
if not isinstance(expression.func, ast.Name):
raise AppImportError("Function reference must be a simple name: %r" % obj)

name = expression.func.id

# Parse the positional and keyword arguments as literals.
try:
args = [ast.literal_eval(arg) for arg in expression.args]
kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in expression.keywords}
except ValueError:
# literal_eval gives cryptic error messages, show a generic
# message with the full expression instead.
raise AppImportError(
"Failed to parse arguments as literal values: %r" % obj
)
else:
raise AppImportError(
"Failed to parse %r as an attribute name or function call." % obj
)

is_debug = logging.root.level == logging.DEBUG
try:
app = getattr(mod, obj)
app = getattr(mod, name)
except AttributeError:
if is_debug:
traceback.print_exception(*sys.exc_info())
raise AppImportError("Failed to find application object %r in %r" % (obj, module))
raise AppImportError("Failed to find attribute %r in %r." % (name, module))

# If the expression was a function call, call the retrieved object
# to get the real application.
if args is not None:
try:
app = app(*args, **kwargs)
except TypeError as e:
# If the TypeError was due to bad arguments to the factory
# function, show Python's nice error message without a
# traceback.
if _called_with_wrong_args(app):
raise AppImportError(
"".join(traceback.format_exception_only(TypeError, e)).strip()
)

# Otherwise it was raised from within the function, show the
# full traceback.
raise

if app is None:
raise AppImportError("Failed to find application object: %r" % obj)
Expand Down
39 changes: 26 additions & 13 deletions tests/support.py
Expand Up @@ -7,19 +7,32 @@
HOST = "127.0.0.1"


@validator
def app(environ, start_response):
"""Simplest possible application object"""

data = b'Hello, World!\n'
status = '200 OK'

response_headers = [
('Content-type', 'text/plain'),
('Content-Length', str(len(data))),
]
start_response(status, response_headers)
return iter([data])
def create_app(name="World", count=1):
message = (('Hello, %s!\n' % name) * count).encode("utf8")
length = str(len(message))

@validator
def app(environ, start_response):
"""Simplest possible application object"""

status = '200 OK'

response_headers = [
('Content-type', 'text/plain'),
('Content-Length', length),
]
start_response(status, response_headers)
return iter([message])

return app


app = application = create_app()
none_app = None


def error_factory():
raise TypeError("inner")


def requires_mac_ver(*min_version):
Expand Down
49 changes: 41 additions & 8 deletions tests/test_util.py
Expand Up @@ -2,6 +2,7 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os

import pytest

Expand Down Expand Up @@ -60,17 +61,49 @@ def test_warn(capsys):
assert '!!! WARNING: test warn' in err


def test_import_app():
assert util.import_app('support:app')
@pytest.mark.parametrize(
"value",
[
"support",
"support:app",
"support:create_app()",
"support:create_app('Gunicorn', 3)",
"support:create_app(count=3)",
],
)
def test_import_app_good(value):
assert util.import_app(value)


@pytest.mark.parametrize(
("value", "exc_type", "msg"),
[
("a:app", ImportError, "No module"),
("support:create_app(", AppImportError, "Failed to parse"),
("support:create.app()", AppImportError, "Function reference"),
("support:create_app(Gunicorn)", AppImportError, "literal values"),
("support:create.app", AppImportError, "attribute name"),
("support:wrong_app", AppImportError, "find attribute"),
("support:error_factory(1)", AppImportError, "error_factory() takes"),
("support:error_factory()", TypeError, "inner"),
("support:none_app", AppImportError, "find application object"),
("support:HOST", AppImportError, "callable"),
],
)
def test_import_app_bad(value, exc_type, msg):
with pytest.raises(exc_type) as exc_info:
util.import_app(value)

assert msg in str(exc_info.value)


def test_import_app_py_ext(monkeypatch):
monkeypatch.chdir(os.path.dirname(__file__))

with pytest.raises(ImportError) as exc_info:
util.import_app('a:app')
assert 'No module' in str(exc_info.value)
util.import_app("support.py")

with pytest.raises(AppImportError) as exc_info:
util.import_app('support:wrong_app')
msg = "Failed to find application object 'wrong_app' in 'support'"
assert msg in str(exc_info.value)
assert "did you mean" in str(exc_info.value)


def test_to_bytestring():
Expand Down

0 comments on commit 19cb68f

Please sign in to comment.