Permalink
Fetching contributors…
Cannot retrieve contributors at this time
218 lines (184 sloc) 5.52 KB
##############################################################################
#
# Copyright (c) 2004 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Utility functions
"""
import asyncore
import errno
import logging
import os
import re
import stat
import time
import calendar
logger = logging.getLogger('waitress')
queue_logger = logging.getLogger('waitress.queue')
def find_double_newline(s):
"""Returns the position just after a double newline in the given string."""
pos1 = s.find(b'\n\r\n') # One kind of double newline
if pos1 >= 0:
pos1 += 3
pos2 = s.find(b'\n\n') # Another kind of double newline
if pos2 >= 0:
pos2 += 2
if pos1 >= 0:
if pos2 >= 0:
return min(pos1, pos2)
else:
return pos1
else:
return pos2
def concat(*args):
return ''.join(args)
def join(seq, field=' '):
return field.join(seq)
def group(s):
return '(' + s + ')'
short_days = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat']
long_days = ['sunday', 'monday', 'tuesday', 'wednesday',
'thursday', 'friday', 'saturday']
short_day_reg = group(join(short_days, '|'))
long_day_reg = group(join(long_days, '|'))
daymap = {}
for i in range(7):
daymap[short_days[i]] = i
daymap[long_days[i]] = i
hms_reg = join(3 * [group('[0-9][0-9]')], ':')
months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul',
'aug', 'sep', 'oct', 'nov', 'dec']
monmap = {}
for i in range(12):
monmap[months[i]] = i + 1
months_reg = group(join(months, '|'))
# From draft-ietf-http-v11-spec-07.txt/3.3.1
# Sun, 06 Nov 1994 08:49:37 GMT ; RFC 822, updated by RFC 1123
# Sunday, 06-Nov-94 08:49:37 GMT ; RFC 850, obsoleted by RFC 1036
# Sun Nov 6 08:49:37 1994 ; ANSI C's asctime() format
# rfc822 format
rfc822_date = join(
[concat(short_day_reg, ','), # day
group('[0-9][0-9]?'), # date
months_reg, # month
group('[0-9]+'), # year
hms_reg, # hour minute second
'gmt'
],
' '
)
rfc822_reg = re.compile(rfc822_date)
def unpack_rfc822(m):
g = m.group
return (
int(g(4)), # year
monmap[g(3)], # month
int(g(2)), # day
int(g(5)), # hour
int(g(6)), # minute
int(g(7)), # second
0,
0,
0,
)
# rfc850 format
rfc850_date = join(
[concat(long_day_reg, ','),
join(
[group('[0-9][0-9]?'),
months_reg,
group('[0-9]+')
],
'-'
),
hms_reg,
'gmt'
],
' '
)
rfc850_reg = re.compile(rfc850_date)
# they actually unpack the same way
def unpack_rfc850(m):
g = m.group
yr = g(4)
if len(yr) == 2:
yr = '19' + yr
return (
int(yr), # year
monmap[g(3)], # month
int(g(2)), # day
int(g(5)), # hour
int(g(6)), # minute
int(g(7)), # second
0,
0,
0
)
# parsdate.parsedate - ~700/sec.
# parse_http_date - ~1333/sec.
weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
monthname = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def build_http_date(when):
year, month, day, hh, mm, ss, wd, y, z = time.gmtime(when)
return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
weekdayname[wd],
day, monthname[month], year,
hh, mm, ss)
def parse_http_date(d):
d = d.lower()
m = rfc850_reg.match(d)
if m and m.end() == len(d):
retval = int(calendar.timegm(unpack_rfc850(m)))
else:
m = rfc822_reg.match(d)
if m and m.end() == len(d):
retval = int(calendar.timegm(unpack_rfc822(m)))
else:
return 0
return retval
class logging_dispatcher(asyncore.dispatcher):
logger = logger
def log_info(self, message, type='info'):
severity = {
'info': logging.INFO,
'warning': logging.WARN,
'error': logging.ERROR,
}
self.logger.log(severity.get(type, logging.INFO), message)
def cleanup_unix_socket(path):
try:
st = os.stat(path)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise # pragma: no cover
else:
if stat.S_ISSOCK(st.st_mode):
try:
os.remove(path)
except OSError: # pragma: no cover
# avoid race condition error during tests
pass
class Error(object):
def __init__(self, body):
self.body = body
class BadRequest(Error):
code = 400
reason = 'Bad Request'
class RequestHeaderFieldsTooLarge(BadRequest):
code = 431
reason = 'Request Header Fields Too Large'
class RequestEntityTooLarge(BadRequest):
code = 413
reason = 'Request Entity Too Large'
class InternalServerError(Error):
code = 500
reason = 'Internal Server Error'