Skip to content

Commit

Permalink
Merge 0f4451f into ca7d79a
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed Aug 8, 2016
2 parents ca7d79a + 0f4451f commit 195aca8
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 200 deletions.
6 changes: 1 addition & 5 deletions aiohttp/errors.py
Expand Up @@ -159,11 +159,7 @@ def __init__(self, line=''):
self.line = line


class ParserError(Exception):
"""Base parser error."""


class LineLimitExceededParserError(ParserError):
class LineLimitExceededParserError(HttpBadRequest):
"""Line is too long."""

def __init__(self, msg, limit):
Expand Down
182 changes: 58 additions & 124 deletions aiohttp/server.py
Expand Up @@ -6,11 +6,10 @@
import traceback
import warnings
from html import escape as html_escape
from math import ceil

import aiohttp
from aiohttp import errors, hdrs, helpers, streams
from aiohttp.helpers import _get_kwarg, ensure_future
from aiohttp.helpers import Timeout, _get_kwarg, ensure_future
from aiohttp.log import access_logger, server_logger

__all__ = ('ServerHttpProtocol',)
Expand Down Expand Up @@ -53,15 +52,11 @@ class ServerHttpProtocol(aiohttp.StreamProtocol):
:param keepalive_timeout: number of seconds before closing
keep-alive connection
:type keepalive: int or None
:type keepalive_timeout: int or None
:param bool tcp_keepalive: TCP keep-alive is on, default is on
:param int timeout: slow request timeout
:param allowed_methods: (optional) List of allowed request methods.
Set to empty list to allow all methods.
:type allowed_methods: tuple
:param int slow_request_timeout: slow request timeout
:param bool debug: enable debug mode
Expand All @@ -86,8 +81,6 @@ class ServerHttpProtocol(aiohttp.StreamProtocol):
_request_handler = None
_reading_request = False
_keep_alive = False # keep transport open
_keep_alive_handle = None # keep alive timer handle
_slow_request_timeout_handle = None # slow request timer handle

def __init__(self, *, loop=None,
keepalive_timeout=75, # NGINX default value is 75 secs
Expand Down Expand Up @@ -138,6 +131,7 @@ def __init__(self, *, loop=None,
access_log_format)
else:
self.access_logger = None
self._closing = False

@property
def keep_alive_timeout(self):
Expand All @@ -157,6 +151,7 @@ def closing(self, timeout=15.0):
self._keep_alive = False
self._tcp_keep_alive = False
self._keepalive_timeout = None
self._closing = True

if (not self._reading_request and self.transport is not None):
if self._request_handler:
Expand All @@ -165,27 +160,12 @@ def closing(self, timeout=15.0):

self.transport.close()
self.transport = None
elif self.transport is not None and timeout:
if self._slow_request_timeout_handle is not None:
self._slow_request_timeout_handle.cancel()

# use slow request timeout for closing
# connection_lost cleans timeout handler
now = self._loop.time()
self._slow_request_timeout_handle = self._loop.call_at(
ceil(now+timeout), self.cancel_slow_request)

def connection_made(self, transport):
super().connection_made(transport)

self._request_handler = ensure_future(self.start(), loop=self._loop)

# start slow request timer
if self._slow_request_timeout:
now = self._loop.time()
self._slow_request_timeout_handle = self._loop.call_at(
ceil(now+self._slow_request_timeout), self.cancel_slow_request)

if self._tcp_keepalive:
tcp_keepalive(self, transport)

Expand All @@ -195,12 +175,6 @@ def connection_lost(self, exc):
if self._request_handler is not None:
self._request_handler.cancel()
self._request_handler = None
if self._keep_alive_handle is not None:
self._keep_alive_handle.cancel()
self._keep_alive_handle = None
if self._slow_request_timeout_handle is not None:
self._slow_request_timeout_handle.cancel()
self._slow_request_timeout_handle = None

def data_received(self, data):
super().data_received(data)
Expand All @@ -209,11 +183,6 @@ def data_received(self, data):
if not self._reading_request:
self._reading_request = True

# stop keep-alive timer
if self._keep_alive_handle is not None:
self._keep_alive_handle.cancel()
self._keep_alive_handle = None

def keep_alive(self, val):
"""Set keep-alive connection mode.
Expand All @@ -233,16 +202,6 @@ def log_debug(self, *args, **kw):
def log_exception(self, *args, **kw):
self.logger.exception(*args, **kw)

def cancel_slow_request(self):
if self._request_handler is not None:
self._request_handler.cancel()
self._request_handler = None

if self.transport is not None:
self.transport.close()

self.log_debug('Close slow request.')

@asyncio.coroutine
def start(self):
"""Start processing of incoming requests.
Expand All @@ -255,44 +214,35 @@ def start(self):
"""
reader = self.reader

while True:
message = None
self._keep_alive = False
self._request_count += 1
self._reading_request = False

payload = None
try:
# read HTTP request method
prefix = reader.set_parser(self._request_prefix)
yield from prefix.read()

# start reading request
self._reading_request = True

# start slow request timer
if (self._slow_request_timeout and
self._slow_request_timeout_handle is None):
now = self._loop.time()
self._slow_request_timeout_handle = self._loop.call_at(
ceil(now+self._slow_request_timeout),
self.cancel_slow_request)

# read request headers
httpstream = reader.set_parser(self._request_parser)
message = yield from httpstream.read()

# cancel slow request timer
if self._slow_request_timeout_handle is not None:
self._slow_request_timeout_handle.cancel()
self._slow_request_timeout_handle = None
try:
while not self._closing:
message = None
self._keep_alive = False
self._request_count += 1
self._reading_request = False

payload = None
with Timeout(max(self._slow_request_timeout,
self._keepalive_timeout),
loop=self._loop):
# read HTTP request method
prefix = reader.set_parser(self._request_prefix)
yield from prefix.read()

# start reading request
self._reading_request = True

# start slow request timer
# read request headers
httpstream = reader.set_parser(self._request_parser)
message = yield from httpstream.read()

# request may not have payload
try:
content_length = int(
message.headers.get(hdrs.CONTENT_LENGTH, 0))
except ValueError:
content_length = 0
raise errors.InvalidHeader(hdrs.CONTENT_LENGTH) from None

if (content_length > 0 or
message.method == 'CONNECT' or
Expand All @@ -308,55 +258,39 @@ def start(self):

yield from self.handle_request(message, payload)

except asyncio.CancelledError:
return
except errors.ClientDisconnectedError:
self.log_debug(
'Ignored premature client disconnection #1.')
return
except errors.HttpProcessingError as exc:
if self.transport is not None:
yield from self.handle_error(exc.code, message,
None, exc, exc.headers,
exc.message)
except errors.LineLimitExceededParserError as exc:
yield from self.handle_error(400, message, None, exc)
except Exception as exc:
yield from self.handle_error(500, message, None, exc)
finally:
if self.transport is None:
self.log_debug(
'Ignored premature client disconnection #2.')
return

if payload and not payload.is_eof():
self.log_debug('Uncompleted request.')
self._request_handler = None
self.transport.close()
return
self._closing = True
else:
reader.unset_parser()

if self._request_handler:
if self._keep_alive and self._keepalive_timeout:
self.log_debug(
'Start keep-alive timer for %s sec.',
self._keepalive_timeout)
now = self._loop.time()
self._keep_alive_handle = self._loop.call_at(
ceil(now+self._keepalive_timeout),
self.transport.close)
elif self._keep_alive:
# do nothing, rely on kernel or upstream server
pass
else:
self.log_debug('Close client connection.')
self._request_handler = None
self.transport.close()
return
else:
# connection is closed
return
if not self._keep_alive or not self._keepalive_timeout:
self._closing = True

except asyncio.CancelledError:
self.log_debug(
'Request handler cancelled.')
return
except asyncio.TimeoutError:
self.log_debug(
'Request handler timed out.')
return
except errors.ClientDisconnectedError:
self.log_debug(
'Ignored premature client disconnection #1.')
return
except errors.HttpProcessingError as exc:
yield from self.handle_error(exc.code, message,
None, exc, exc.headers,
exc.message)
except Exception as exc:
yield from self.handle_error(500, message, None, exc)
finally:
self._request_handler = None
if self.transport is None:
self.log_debug(
'Ignored premature client disconnection #2.')
else:
self.transport.close()

def handle_error(self, status=500, message=None,
payload=None, exc=None, headers=None, reason=None):
Expand All @@ -366,7 +300,7 @@ def handle_error(self, status=500, message=None,
information. It always closes current connection."""
now = self._loop.time()
try:
if self._request_handler is None:
if self.transport is None:
# client has been disconnected during writing.
return ()

Expand Down

0 comments on commit 195aca8

Please sign in to comment.